<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[12781] twext/trunk</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/12781">12781</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-03-01 08:17:48 -0800 (Sat, 01 Mar 2014)</dd>
</dl>
<h3>Log Message</h3>
<pre>Merge: New job queue based work queue implementation. Current this does just what the old queue did, but with
a centralized "job" queue where all work items are listed. Eventually this will be made smarter to
automatically deal with coalescing work, repetitive work, pooled work, as well as eliminate head-of-line
blocking.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterpriseadbapi2py">twext/trunk/twext/enterprise/adbapi2.py</a></li>
<li><a href="#twexttrunktwextenterprisedalmodelpy">twext/trunk/twext/enterprise/dal/model.py</a></li>
<li><a href="#twexttrunktwextenterprisedalparseschemapy">twext/trunk/twext/enterprise/dal/parseschema.py</a></li>
<li><a href="#twexttrunktwextenterprisedalrecordpy">twext/trunk/twext/enterprise/dal/record.py</a></li>
<li><a href="#twexttrunktwextenterprisedaltesttest_parseschemapy">twext/trunk/twext/enterprise/dal/test/test_parseschema.py</a></li>
<li><a href="#twexttrunktwextenterpriseienterprisepy">twext/trunk/twext/enterprise/ienterprise.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_adbapi2py">twext/trunk/twext/enterprise/test/test_adbapi2.py</a></li>
</ul>
<h3>Added Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>
<h3>Property Changed</h3>
<ul>
<li><a href="#twexttrunk">twext/trunk/</a></li>
<li><a href="#twexttrunktwext">twext/trunk/twext/</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunk"></a>
<div class="propset"><h4>Property changes: twext/trunk</h4>
<pre class="diff"><span>
</span></pre></div>
<a id="svnmergeinfo"></a>
<div class="addfile"><h4>Added: svn:mergeinfo</h4></div>
<a id="twexttrunktwext"></a>
<div class="propset"><h4>Property changes: twext/trunk/twext</h4>
<pre class="diff"><span>
</span></pre></div>
<a id="svnmergeinfo"></a>
<div class="modfile"><h4>Modified: svn:mergeinfo</h4></div>
<span class="cx">/CalendarServer/branches/config-separation/twext:4379-4443
</span><span class="cx">/CalendarServer/branches/egg-info-351/twext:4589-4625
</span><span class="cx">/CalendarServer/branches/generic-sqlstore/twext:6167-6191
</span><span class="cx">/CalendarServer/branches/new-store/twext:5594-5934
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
</span><span class="cx">/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
</span><span class="cx">/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
</span><span class="cx">/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
</span><span class="cx">/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
</span><span class="cx">/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
</span><span class="cx">/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
</span><span class="cx">/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
</span><span class="cx">/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
</span><span class="cx">/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
</span><span class="cx">/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
</span><span class="cx">/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
</span><span class="cx">/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
</span><span class="cx">/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
</span><span class="cx">/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
</span><span class="cx">/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
</span><span class="cx">/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
</span><span class="cx">/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
</span><span class="cx">/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
</span><span class="cx">/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
</span><span class="cx">/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
</span><span class="cx">/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
</span><span class="cx">/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
</span><span class="cx">/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
</span><span class="cx">/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
</span><span class="cx">/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
</span><span class="cx">/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
</span><span class="cx">/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
</span><span class="cx">/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
</span><span class="cx">/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
</span><span class="cx">/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
</span><span class="cx">/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
</span><span class="cx">/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
</span><span class="cx">/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
</span><span class="cx">/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
</span><span class="cx">/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
</span><span class="cx">/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
</span><span class="cx">/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
</span><span class="cx">/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
</span><span class="cx">/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
</span><span class="cx">/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
</span><span class="cx">/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
</span><span class="cx">/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
</span><span class="cx">/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
</span><span class="cx">/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
</span><span class="cx">/CalendarServer/branches/users/glyph/q/twext:9560-9688
</span><span class="cx">/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
</span><span class="cx">/CalendarServer/branches/users/glyph/quota/twext:7604-7637
</span><span class="cx">/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
</span><span class="cx">/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
</span><span class="cx">/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
</span><span class="cx">/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
</span><span class="cx">/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
</span><span class="cx">/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
</span><span class="cx">/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
</span><span class="cx">/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
</span><span class="cx">/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
</span><span class="cx">/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
</span><span class="cx">/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
</span><span class="cx">/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
</span><span class="cx">/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
</span><span class="cx">/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
</span><span class="cx">/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
</span><span class="cx">/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
</span><span class="cx">/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
</span><span class="cx">/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
</span><span class="cx">/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
</span><span class="cx">/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
</span><span class="cx">/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
</span><span class="cx"> + /CalDAVTester/trunk/twext:11193-11198
</span><span class="cx">/CalendarServer/branches/config-separation/twext:4379-4443
</span><span class="cx">/CalendarServer/branches/egg-info-351/twext:4589-4625
</span><span class="cx">/CalendarServer/branches/generic-sqlstore/twext:6167-6191
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
</span><span class="cx">/CalendarServer/branches/new-store/twext:5594-5934
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
</span><span class="cx">/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
</span><span class="cx">/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
</span><span class="cx">/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
</span><span class="cx">/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
</span><span class="cx">/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
</span><span class="cx">/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
</span><span class="cx">/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
</span><span class="cx">/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
</span><span class="cx">/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
</span><span class="cx">/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
</span><span class="cx">/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
</span><span class="cx">/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
</span><span class="cx">/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
</span><span class="cx">/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
</span><span class="cx">/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
</span><span class="cx">/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
</span><span class="cx">/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
</span><span class="cx">/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
</span><span class="cx">/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
</span><span class="cx">/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
</span><span class="cx">/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
</span><span class="cx">/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
</span><span class="cx">/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
</span><span class="cx">/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
</span><span class="cx">/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
</span><span class="cx">/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
</span><span class="cx">/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
</span><span class="cx">/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
</span><span class="cx">/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
</span><span class="cx">/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
</span><span class="cx">/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
</span><span class="cx">/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
</span><span class="cx">/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
</span><span class="cx">/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
</span><span class="cx">/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
</span><span class="cx">/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
</span><span class="cx">/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
</span><span class="cx">/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
</span><span class="cx">/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
</span><span class="cx">/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
</span><span class="cx">/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
</span><span class="cx">/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
</span><span class="cx">/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
</span><span class="cx">/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
</span><span class="cx">/CalendarServer/branches/users/glyph/q/twext:9560-9688
</span><span class="cx">/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
</span><span class="cx">/CalendarServer/branches/users/glyph/quota/twext:7604-7637
</span><span class="cx">/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
</span><span class="cx">/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
</span><span class="cx">/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
</span><span class="cx">/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
</span><span class="cx">/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
</span><span class="cx">/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
</span><span class="cx">/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
</span><span class="cx">/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
</span><span class="cx">/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
</span><span class="cx">/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
</span><span class="cx">/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
</span><span class="cx">/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
</span><span class="cx">/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
</span><span class="cx">/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
</span><span class="cx">/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
</span><span class="cx">/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
</span><span class="cx">/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
</span><span class="cx">/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
</span><span class="cx">/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
</span><span class="cx">/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
</span><span class="cx">/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
</span><span class="cx">/twext/branches/users/cdaboo/jobs/twext:12742-12780
</span><a id="twexttrunktwextenterpriseadbapi2py"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/adbapi2.py (12780 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/adbapi2.py        2014-03-01 15:45:25 UTC (rev 12780)
+++ twext/trunk/twext/enterprise/adbapi2.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -155,12 +155,12 @@
</span><span class="cx"> noisy = False
</span><span class="cx">
</span><span class="cx"> def __init__(self, pool, threadHolder, connection, cursor):
</span><del>- self._pool = pool
- self._completed = "idle"
- self._cursor = cursor
</del><ins>+ self._pool = pool
+ self._completed = "idle"
+ self._cursor = cursor
</ins><span class="cx"> self._connection = connection
</span><del>- self._holder = threadHolder
- self._first = True
</del><ins>+ self._holder = threadHolder
+ self._first = True
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @_forward
</span><span class="lines">@@ -169,14 +169,12 @@
</span><span class="cx"> The paramstyle attribute is mirrored from the connection pool.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> @_forward
</span><span class="cx"> def dialect(self):
</span><span class="cx"> """
</span><span class="cx"> The dialect attribute is mirrored from the connection pool.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
</span><span class="cx"> """
</span><span class="cx"> Execute the given SQL on a thread, using a DB-API 2.0 cursor.
</span><span class="lines">@@ -280,7 +278,7 @@
</span><span class="cx"> # that we cannot workaround or address automatically, so no
</span><span class="cx"> # try:except: for them.
</span><span class="cx"> self._connection = self._pool.connectionFactory()
</span><del>- self._cursor = self._connection.cursor()
</del><ins>+ self._cursor = self._connection.cursor()
</ins><span class="cx">
</span><span class="cx"> # Note that although this method is being invoked recursively,
</span><span class="cx"> # the "_first" flag is re-set at the very top, so we will _not_
</span><span class="lines">@@ -382,9 +380,9 @@
</span><span class="cx"> transaction.
</span><span class="cx"> """
</span><span class="cx"> self._completed = "released"
</span><del>- self._stopped = True
- holder = self._holder
- self._holder = None
</del><ins>+ self._stopped = True
+ holder = self._holder
+ self._holder = None
</ins><span class="cx">
</span><span class="cx"> def _reallyClose():
</span><span class="cx"> if self._cursor is None:
</span><span class="lines">@@ -417,8 +415,8 @@
</span><span class="cx"> return fail(ConnectionError(self.reason))
</span><span class="cx">
</span><span class="cx"> execSQL = _everything
</span><del>- commit = _everything
- abort = _everything
</del><ins>+ commit = _everything
+ abort = _everything
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -593,12 +591,12 @@
</span><span class="cx">
</span><span class="cx"> def __init__(self, pool, baseTxn):
</span><span class="cx"> super(_SingleTxn, self).__init__()
</span><del>- self._pool = pool
- self._baseTxn = baseTxn
- self._completed = False
- self._currentBlock = None
- self._blockedQueue = None
- self._pendingBlocks = []
</del><ins>+ self._pool = pool
+ self._baseTxn = baseTxn
+ self._completed = False
+ self._currentBlock = None
+ self._blockedQueue = None
+ self._pendingBlocks = []
</ins><span class="cx"> self._stillExecuting = []
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -615,7 +613,7 @@
</span><span class="cx"> implementation of L{IAsyncTransaction} that will actually do the work;
</span><span class="cx"> either a L{_ConnectedTxn} or a L{_NoTxn}.
</span><span class="cx"> """
</span><del>- spooledBase = self._baseTxn
</del><ins>+ spooledBase = self._baseTxn
</ins><span class="cx"> self._baseTxn = baseTxn
</span><span class="cx"> spooledBase._unspool(baseTxn)
</span><span class="cx">
</span><span class="lines">@@ -892,8 +890,8 @@
</span><span class="cx"> and subsequent SQL executions for this connection.
</span><span class="cx"> @type holder: L{ThreadHolder}
</span><span class="cx"> """
</span><del>- self._pool = pool
- self._holder = holder
</del><ins>+ self._pool = pool
+ self._holder = holder
</ins><span class="cx"> self._aborted = False
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -992,11 +990,11 @@
</span><span class="cx"> if name is not None:
</span><span class="cx"> self.name = name
</span><span class="cx">
</span><del>- self._free = []
- self._busy = []
- self._waiting = []
- self._finishing = []
- self._stopping = False
</del><ins>+ self._free = []
+ self._busy = []
+ self._waiting = []
+ self._finishing = []
+ self._stopping = False
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def startService(self):
</span><span class="lines">@@ -1107,7 +1105,7 @@
</span><span class="cx"> # support threadlevel=1; we can't necessarily cursor() in a
</span><span class="cx"> # different thread than we do transactions in.
</span><span class="cx"> connection = self.connectionFactory()
</span><del>- cursor = connection.cursor()
</del><ins>+ cursor = connection.cursor()
</ins><span class="cx"> return (connection, cursor)
</span><span class="cx">
</span><span class="cx"> def finishInit((connection, cursor)):
</span><span class="lines">@@ -1364,8 +1362,8 @@
</span><span class="cx"> Initialize a mapping of transaction IDs to transaction objects.
</span><span class="cx"> """
</span><span class="cx"> super(ConnectionPoolConnection, self).__init__()
</span><del>- self.pool = pool
- self._txns = {}
</del><ins>+ self.pool = pool
+ self._txns = {}
</ins><span class="cx"> self._blocks = {}
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -1479,10 +1477,10 @@
</span><span class="cx"> ):
</span><span class="cx"> # See DEFAULT_PARAM_STYLE FIXME above.
</span><span class="cx"> super(ConnectionPoolClient, self).__init__()
</span><del>- self._nextID = count().next
- self._txns = weakref.WeakValueDictionary()
- self._queries = {}
- self.dialect = dialect
</del><ins>+ self._nextID = count().next
+ self._txns = weakref.WeakValueDictionary()
+ self._queries = {}
+ self.dialect = dialect
</ins><span class="cx"> self.paramstyle = paramstyle
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -1507,8 +1505,8 @@
</span><span class="cx">
</span><span class="cx"> @rtype: L{IAsyncTransaction}
</span><span class="cx"> """
</span><del>- txnid = str(self._nextID())
- txn = _NetTransaction(client=self, transactionID=txnid)
</del><ins>+ txnid = str(self._nextID())
+ txn = _NetTransaction(client=self, transactionID=txnid)
</ins><span class="cx"> self._txns[txnid] = txn
</span><span class="cx"> self.callRemote(StartTxn, transactionID=txnid)
</span><span class="cx"> return txn
</span><span class="lines">@@ -1529,10 +1527,10 @@
</span><span class="cx">
</span><span class="cx"> class _Query(object):
</span><span class="cx"> def __init__(self, sql, raiseOnZeroRowCount, args):
</span><del>- self.sql = sql
- self.args = args
- self.results = []
- self.deferred = Deferred()
</del><ins>+ self.sql = sql
+ self.args = args
+ self.results = []
+ self.deferred = Deferred()
</ins><span class="cx"> self.raiseOnZeroRowCount = raiseOnZeroRowCount
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -1606,11 +1604,11 @@
</span><span class="cx"> transaction identifier.
</span><span class="cx"> """
</span><span class="cx"> super(_NetTransaction, self).__init__()
</span><del>- self._client = client
</del><ins>+ self._client = client
</ins><span class="cx"> self._transactionID = transactionID
</span><del>- self._completed = False
- self._committing = False
- self._committed = False
</del><ins>+ self._completed = False
+ self._committing = False
+ self._committed = False
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @property
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalmodelpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/model.py (12780 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/model.py        2014-03-01 15:45:25 UTC (rev 12780)
+++ twext/trunk/twext/enterprise/dal/model.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -30,6 +30,7 @@
</span><span class="cx"> "Index",
</span><span class="cx"> "PseudoIndex",
</span><span class="cx"> "Sequence",
</span><ins>+ "Function",
</ins><span class="cx"> "Schema",
</span><span class="cx"> ]
</span><span class="cx">
</span><span class="lines">@@ -520,6 +521,36 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class Function(FancyEqMixin, object):
+ """
+ A function object.
+ """
+
+ compareAttributes = "name".split()
+
+ def __init__(self, schema, name):
+ _checkstr(name)
+ self.name = name
+ schema.functions.append(self)
+
+
+ def __repr__(self):
+ return "<Function %r>" % (self.name,)
+
+
+ def compare(self, other):
+ """
+ Return the differences between two functions.
+
+ @param other: the function to compare with
+ @type other: L{Function}
+ """
+
+ # TODO: ought to compare function body but we don't track that
+ return []
+
+
+
</ins><span class="cx"> def _namedFrom(name, sequence):
</span><span class="cx"> """
</span><span class="cx"> Retrieve an item with a given name attribute from a given sequence, or
</span><span class="lines">@@ -542,6 +573,7 @@
</span><span class="cx"> self.tables = []
</span><span class="cx"> self.indexes = []
</span><span class="cx"> self.sequences = []
</span><ins>+ self.functions = []
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def __repr__(self):
</span><span class="lines">@@ -580,6 +612,7 @@
</span><span class="cx"> _compareLists(self.tables, other.tables, "table")
</span><span class="cx"> _compareLists(self.pseudoIndexes(), other.pseudoIndexes(), "index")
</span><span class="cx"> _compareLists(self.sequences, other.sequences, "sequence")
</span><ins>+ _compareLists(self.functions, other.functions, "functions")
</ins><span class="cx">
</span><span class="cx"> return results
</span><span class="cx">
</span><span class="lines">@@ -621,3 +654,7 @@
</span><span class="cx">
</span><span class="cx"> def indexNamed(self, name):
</span><span class="cx"> return _namedFrom(name, self.indexes)
</span><ins>+
+
+ def functionNamed(self, name):
+ return _namedFrom(name, self.functions)
</ins></span></pre></div>
<a id="twexttrunktwextenterprisedalparseschemapy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/parseschema.py (12780 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/parseschema.py        2014-03-01 15:45:25 UTC (rev 12780)
+++ twext/trunk/twext/enterprise/dal/parseschema.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -43,7 +43,7 @@
</span><span class="cx"> Function, Comparison)
</span><span class="cx">
</span><span class="cx"> from twext.enterprise.dal.model import (
</span><del>- Schema, Table, SQLType, ProcedureCall, Constraint, Sequence, Index)
</del><ins>+ Schema, Table, SQLType, ProcedureCall, Constraint, Sequence, Index, Function as FunctionModel)
</ins><span class="cx">
</span><span class="cx"> from twext.enterprise.dal.syntax import (
</span><span class="cx"> ColumnSyntax, CompoundComparison, Constant, Function as FunctionSyntax
</span><span class="lines">@@ -208,6 +208,12 @@
</span><span class="cx"> columnName = nameOrIdentifier(token)
</span><span class="cx"> idx.addColumn(idx.table.columnNamed(columnName))
</span><span class="cx">
</span><ins>+ elif createType == u"FUNCTION":
+ FunctionModel(
+ schema,
+ stmt.token_next(2, True).value.encode("utf-8")
+ )
+
</ins><span class="cx"> elif stmt.get_type() == "INSERT":
</span><span class="cx"> insertTokens = iterSignificant(stmt)
</span><span class="cx"> expect(insertTokens, ttype=Keyword.DML, value="INSERT")
</span><span class="lines">@@ -234,6 +240,15 @@
</span><span class="cx">
</span><span class="cx"> schema.tableNamed(tableName).insertSchemaRow(rowData)
</span><span class="cx">
</span><ins>+ elif stmt.get_type() == "CREATE OR REPLACE":
+ createType = stmt.token_next(1, True).value.upper()
+
+ if createType == u"FUNCTION":
+ FunctionModel(
+ schema,
+ stmt.token_next(2, True).token_first(True).token_first(True).value.encode("utf-8")
+ )
+
</ins><span class="cx"> else:
</span><span class="cx"> print("unknown type:", stmt.get_type())
</span><span class="cx">
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalrecordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/record.py (12780 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/record.py        2014-03-01 15:45:25 UTC (rev 12780)
+++ twext/trunk/twext/enterprise/dal/record.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -171,6 +171,23 @@
</span><span class="cx"> return r
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @classmethod
+ def fromTable(cls, table):
+ """
+ Initialize from a L{Table} at run time.
+
+ @param table: table containing the record data
+ @type table: L{Table}
+ """
+ cls.__attrmap__ = {}
+ cls.__colmap__ = {}
+ allColumns = list(table)
+ for column in allColumns:
+ attrname = cls.namingConvention(column.model.name)
+ cls.__attrmap__[attrname] = column
+ cls.__colmap__[column] = attrname
+
+
</ins><span class="cx"> @staticmethod
</span><span class="cx"> def namingConvention(columnName):
</span><span class="cx"> """
</span><span class="lines">@@ -274,7 +291,7 @@
</span><span class="cx"> """
</span><span class="cx"> for setAttribute, setValue in attributeList:
</span><span class="cx"> setColumn = self.__attrmap__[setAttribute]
</span><del>- if setColumn.model.type.name == "timestamp":
</del><ins>+ if setColumn.model.type.name == "timestamp" and setValue is not None:
</ins><span class="cx"> setValue = parseSQLTimestamp(setValue)
</span><span class="cx"> setattr(self, setAttribute, setValue)
</span><span class="cx">
</span><span class="lines">@@ -335,7 +352,7 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><del>- def query(cls, transaction, expr, order=None, ascending=True, group=None):
</del><ins>+ def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False):
</ins><span class="cx"> """
</span><span class="cx"> Query the table that corresponds to C{cls}, and return instances of
</span><span class="cx"> C{cls} corresponding to the rows that are returned from that table.
</span><span class="lines">@@ -353,15 +370,29 @@
</span><span class="cx">
</span><span class="cx"> @param group: a L{ColumnSyntax} to group the resulting record objects
</span><span class="cx"> by.
</span><ins>+
+ @param forUpdate: do a SELECT ... FOR UPDATE
+ @type forUpdate: L{bool}
+ @param noWait: include NOWAIT with the FOR UPDATE
+ @type noWait: L{bool}
</ins><span class="cx"> """
</span><span class="cx"> kw = {}
</span><span class="cx"> if order is not None:
</span><span class="cx"> kw.update(OrderBy=order, Ascending=ascending)
</span><span class="cx"> if group is not None:
</span><span class="cx"> kw.update(GroupBy=group)
</span><ins>+ if forUpdate:
+ kw.update(ForUpdate=True)
+ if noWait:
+ kw.update(NoWait=True)
</ins><span class="cx"> return cls._rowsFromQuery(
</span><span class="cx"> transaction,
</span><del>- Select(list(cls.table), From=cls.table, Where=expr, **kw),
</del><ins>+ Select(
+ list(cls.table),
+ From=cls.table,
+ Where=expr,
+ **kw
+ ),
</ins><span class="cx"> None
</span><span class="cx"> )
</span><span class="cx">
</span></span></pre></div>
<a id="twexttrunktwextenterprisedaltesttest_parseschemapy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/test/test_parseschema.py (12780 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/test/test_parseschema.py        2014-03-01 15:45:25 UTC (rev 12780)
+++ twext/trunk/twext/enterprise/dal/test/test_parseschema.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -403,3 +403,27 @@
</span><span class="cx"> "z-unique:(c)",
</span><span class="cx"> ))
</span><span class="cx"> )
</span><ins>+
+
+ def test_functions(self):
+ """
+ A 'create (or replace) function' statement will add an L{Function} object to a L{Schema}'s
+ C{functions} list.
+ """
+ s = self.schemaFromString(
+ """
+CREATE FUNCTION increment(i integer) RETURNS integer AS $$
+BEGIN
+ RETURN i + 1;
+END;
+$$ LANGUAGE plpgsql;
+CREATE OR REPLACE FUNCTION decrement(i integer) RETURNS integer AS $$
+BEGIN
+ RETURN i - 1;
+END;
+$$ LANGUAGE plpgsql;
+ """
+ )
+ self.assertTrue(s.functionNamed("increment") is not None)
+ self.assertTrue(s.functionNamed("decrement") is not None)
+ self.assertRaises(KeyError, s.functionNamed, "merge")
</ins></span></pre></div>
<a id="twexttrunktwextenterpriseienterprisepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/ienterprise.py (12780 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/ienterprise.py        2014-03-01 15:45:25 UTC (rev 12780)
+++ twext/trunk/twext/enterprise/ienterprise.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -115,7 +115,6 @@
</span><span class="cx"> transaction has already been committed or rolled back.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def preCommit(operation):
</span><span class="cx"> """
</span><span class="cx"> Perform the given operation when this L{IAsyncTransaction}'s C{commit}
</span><span class="lines">@@ -128,7 +127,6 @@
</span><span class="cx"> will not fire until that L{Deferred} fires.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def postCommit(operation):
</span><span class="cx"> """
</span><span class="cx"> Perform the given operation only after this L{IAsyncTransaction}
</span><span class="lines">@@ -140,7 +138,6 @@
</span><span class="cx"> will not fire until that L{Deferred} fires.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def abort():
</span><span class="cx"> """
</span><span class="cx"> Roll back changes caused by this transaction.
</span><span class="lines">@@ -149,7 +146,6 @@
</span><span class="cx"> rollback of this transaction.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def postAbort(operation):
</span><span class="cx"> """
</span><span class="cx"> Invoke a callback after abort.
</span><span class="lines">@@ -160,7 +156,6 @@
</span><span class="cx"> L{Deferred}.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def commandBlock():
</span><span class="cx"> """
</span><span class="cx"> Create an object which will cause the commands executed on it to be
</span><span class="lines">@@ -257,7 +252,6 @@
</span><span class="cx"> @return: the concrete value which should be passed to the DB-API layer.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def postQuery(cursor):
</span><span class="cx"> """
</span><span class="cx"> After running a query, invoke this method in the DB-API thread.
</span><span class="lines">@@ -288,17 +282,16 @@
</span><span class="cx">
</span><span class="cx"> @param workItemType: the type of work item to create.
</span><span class="cx"> @type workItemType: L{type}, specifically, a subtype of L{WorkItem
</span><del>- <twext.enterprise.queue.WorkItem>}
</del><ins>+ <twext.enterprise.jobqueue.WorkItem>}
</ins><span class="cx">
</span><span class="cx"> @param kw: The keyword parameters are relayed to C{workItemType.create}
</span><span class="cx"> to create an appropriately initialized item.
</span><span class="cx">
</span><span class="cx"> @return: a work proposal that allows tracking of the various phases of
</span><span class="cx"> completion of the work item.
</span><del>- @rtype: L{twext.enterprise.queue.WorkItem}
</del><ins>+ @rtype: L{twext.enterprise.jobqueue.WorkItem}
</ins><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def callWithNewProposals(self, callback):
</span><span class="cx"> """
</span><span class="cx"> Tells the IQueuer to call a callback method whenever a new WorkProposal
</span><span class="lines">@@ -308,7 +301,6 @@
</span><span class="cx"> L{WorkProposal}
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def transferProposalCallbacks(self, newQueuer):
</span><span class="cx"> """
</span><span class="cx"> Transfer the registered callbacks to the new queuer.
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobqueuepyfromrev12780twextbranchesuserscdaboojobstwextenterprisejobqueuepy"></a>
<div class="copfile"><h4>Copied: twext/trunk/twext/enterprise/jobqueue.py (from rev 12780, twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py) (0 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py         (rev 0)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -0,0 +1,1592 @@
</span><ins>+# -*- test-case-name: twext.enterprise.test.test_queue -*-
+##
+# Copyright (c) 2012-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+
+"""
+L{twext.enterprise.jobqueue} is an U{eventually consistent
+<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queuing system for
+use by applications with multiple front-end servers talking to a single
+database instance, that want to defer and parallelize work that involves
+storing the results of computation.
+
+By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future, but
+defer it if it does not need to be done I{now}.
+
+To pick a hypothetical example, let's say that you have a store which wants to
+issue a promotional coupon based on a customer loyalty program, in response to
+an administrator clicking on a button. Determining the list of customers to
+send the coupon to is quick: a simple query will get you all their names.
+However, analyzing each user's historical purchase data is (A) time consuming
+and (B) relatively isolated, so it would be good to do that in parallel, and it
+would also be acceptable to have that happen at a later time, outside the
+critical path.
+
+Such an application might be implemented with this queuing system like so::
+
+ from twext.enterprise.jobqueue import WorkItem, queueFromTransaction
+ from twext.enterprise.dal.parseschema import addSQLToSchema
+ from twext.enterprise.dal.syntax import SchemaSyntax
+
+ schemaModel = Schema()
+ addSQLToSchema('''
+ create table CUSTOMER (NAME varchar(255), ID integer primary key);
+ create table PRODUCT (NAME varchar(255), ID integer primary key);
+ create table PURCHASE (NAME varchar(255), WHEN timestamp,
+ CUSTOMER_ID integer references CUSTOMER,
+ PRODUCT_ID integer references PRODUCT;
+ create table COUPON_WORK (WORK_ID integer primary key,
+ CUSTOMER_ID integer references CUSTOMER);
+ create table COUPON (ID integer primary key,
+ CUSTOMER_ID integer references customer,
+ AMOUNT integer);
+ ''')
+ schema = SchemaSyntax(schemaModel)
+
+ class Coupon(Record, fromTable(schema.COUPON_WORK)):
+ pass
+
+ class CouponWork(WorkItem, fromTable(schema.COUPON_WORK)):
+ @inlineCallbacks
+ def doWork(self):
+ purchases = yield Select(schema.PURCHASE,
+ Where=schema.PURCHASE.CUSTOMER_ID
+ == self.customerID).on(self.transaction)
+ couponAmount = yield doSomeMathThatTakesAWhile(purchases)
+ yield Coupon.create(customerID=self.customerID,
+ amount=couponAmount)
+
+ @inlineCallbacks
+ def makeSomeCoupons(txn):
+ # Note, txn was started before, will be committed later...
+ for customerID in (yield Select([schema.CUSTOMER.CUSTOMER_ID],
+ From=schema.CUSTOMER).on(txn)):
+ # queuer is a provider of IQueuer, of which there are several
+ # implementations in this module.
+ queuer.enqueueWork(txn, CouponWork, customerID=customerID)
+"""
+
+from functools import wraps
+from datetime import datetime
+
+from zope.interface import implements
+
+from twisted.application.service import MultiService
+from twisted.internet.protocol import Factory
+from twisted.internet.defer import (
+ inlineCallbacks, returnValue, Deferred, passthru, succeed
+)
+from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.protocols.amp import AMP, Command, Integer, String
+from twisted.python.reflect import qual
+from twisted.python import log
+
+from twext.enterprise.dal.syntax import SchemaSyntax, Lock, NamedValue, Select, \
+ Count
+
+from twext.enterprise.dal.model import ProcedureCall
+from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
+from twisted.python.failure import Failure
+
+from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
+from twisted.internet.endpoints import TCP4ServerEndpoint
+from twext.enterprise.ienterprise import IQueuer
+from zope.interface.interface import Interface
+from twext.enterprise.locking import NamedLock
+
+
+class _IJobPerformer(Interface):
+ """
+ An object that can perform work.
+
+ Internal interface; implemented by several classes here since work has to
+ (in the worst case) pass from worker->controller->controller->worker.
+ """
+
+ def performJob(jobID): #@NoSelf
+ """
+ @param jobID: The primary key identifier of the given job.
+ @type jobID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
+
+
+
+def makeNodeSchema(inSchema):
+ """
+ Create a self-contained schema for L{NodeInfo} to use, in C{inSchema}.
+
+ @param inSchema: a L{Schema} to add the node-info table to.
+ @type inSchema: L{Schema}
+
+ @return: a schema with just the one table.
+ """
+ # Initializing this duplicate schema avoids a circular dependency, but this
+ # should really be accomplished with independent schema objects that the
+ # transaction is made aware of somehow.
+ NodeTable = Table(inSchema, "NODE_INFO")
+
+ NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+ NodeTable.addColumn("PID", SQLType("integer", None))
+ NodeTable.addColumn("PORT", SQLType("integer", None))
+ NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
+ # Note: in the real data structure, this is actually a not-cleaned-up
+ # sqlparse internal data structure, but it *should* look closer to
+ # this.
+ ProcedureCall("timezone", ["UTC", NamedValue("CURRENT_TIMESTAMP")])
+ )
+ for column in NodeTable.columns:
+ NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+ NodeTable.primaryKey = [
+ NodeTable.columnNamed("HOSTNAME"),
+ NodeTable.columnNamed("PORT"),
+ ]
+
+ return inSchema
+
+NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
+
+
+
+def makeJobSchema(inSchema):
+ """
+ Create a self-contained schema for L{JobInfo} to use, in C{inSchema}.
+
+ @param inSchema: a L{Schema} to add the node-info table to.
+ @type inSchema: L{Schema}
+
+ @return: a schema with just the one table.
+ """
+ # Initializing this duplicate schema avoids a circular dependency, but this
+ # should really be accomplished with independent schema objects that the
+ # transaction is made aware of somehow.
+ JobTable = Table(inSchema, "JOB")
+
+ JobTable.addColumn("JOB_ID", SQLType("integer", None)).setDefaultValue(
+ ProcedureCall("nextval", ["JOB_SEQ"])
+ )
+ JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255))
+ JobTable.addColumn("PRIORITY", SQLType("integer", 0))
+ JobTable.addColumn("WEIGHT", SQLType("integer", 0))
+ JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None))
+ JobTable.addColumn("NOT_AFTER", SQLType("timestamp", None))
+ for column in ("JOB_ID", "WORK_TYPE"):
+ JobTable.tableConstraint(Constraint.NOT_NULL, [column])
+ JobTable.primaryKey = [JobTable.columnNamed("JOB_ID"), ]
+
+ return inSchema
+
+JobInfoSchema = SchemaSyntax(makeJobSchema(Schema(__file__)))
+
+
+
+@inlineCallbacks
+def inTransaction(transactionCreator, operation):
+ """
+ Perform the given operation in a transaction, committing or aborting as
+ required.
+
+ @param transactionCreator: a 0-arg callable that returns an
+ L{IAsyncTransaction}
+
+ @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+ returns a value.
+
+ @return: a L{Deferred} that fires with C{operation}'s result or fails with
+ its error, unless there is an error creating, aborting or committing
+ the transaction.
+ """
+ txn = transactionCreator()
+ try:
+ result = yield operation(txn)
+ except:
+ f = Failure()
+ yield txn.abort()
+ returnValue(f)
+ else:
+ yield txn.commit()
+ returnValue(result)
+
+
+
+def astimestamp(v):
+ """
+ Convert the given datetime to a POSIX timestamp.
+ """
+ return (v - datetime.utcfromtimestamp(0)).total_seconds()
+
+
+
+class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
+ """
+ A L{NodeInfo} is information about a currently-active Node process.
+ """
+
+ def endpoint(self, reactor):
+ """
+ Create an L{IStreamServerEndpoint} that will talk to the node process
+ that is described by this L{NodeInfo}.
+
+ @return: an endpoint that will connect to this host.
+ @rtype: L{IStreamServerEndpoint}
+ """
+ return TCP4ClientEndpoint(reactor, self.hostname, self.port)
+
+
+
+def abstract(thunk):
+ """
+ The decorated function is abstract.
+
+ @note: only methods are currently supported.
+ """
+ @classmethod
+ @wraps(thunk)
+ def inner(cls, *a, **k):
+ raise NotImplementedError(
+ qual(cls) + " does not implement " + thunk.func_name
+ )
+ return inner
+
+
+
+class JobItem(Record, fromTable(JobInfoSchema.JOB)):
+ """
+ An item in the job table. This is typically not directly used by code creating
+ work items, but rather is used for internal book keeping of jobs associated with
+ work items.
+ """
+
+ @inlineCallbacks
+ def getWorkForJob(self):
+ workItemClass = WorkItem.forTableName(self.workType)
+ workItems = yield workItemClass.loadForJob(self.transaction, self.jobID)
+ returnValue(workItems[0] if len(workItems) == 1 else None)
+
+
+ @inlineCallbacks
+ def run(self):
+ """
+ Run this job item by finding the appropriate work item class and running that.
+ """
+ workItem = yield self.getWorkForJob()
+ if workItem is not None:
+ if workItem.group is not None:
+ yield NamedLock.acquire(self.transaction, workItem.group)
+
+ try:
+ # Once the work is done we delete ourselves
+ yield workItem.delete()
+ except NoSuchRecord:
+ # The record has already been removed
+ pass
+ else:
+ yield workItem.doWork()
+
+ try:
+ # Once the work is done we delete ourselves
+ yield self.delete()
+ except NoSuchRecord:
+ # The record has already been removed
+ pass
+
+
+ @classmethod
+ @inlineCallbacks
+ def histogram(cls, txn):
+ """
+ Generate a histogram of work items currently in the queue.
+ """
+ jb = JobInfoSchema.JOB
+ rows = yield Select(
+ [jb.WORK_TYPE, Count(jb.WORK_TYPE)],
+ From=jb,
+ GroupBy=jb.WORK_TYPE
+ ).on(txn)
+ results = dict(rows)
+
+ # Add in empty data for other work
+ allwork = WorkItem.__subclasses__()
+ for workitem in allwork:
+ if workitem.table.model.name not in results:
+ results[workitem.table.model.name] = 0
+ returnValue(results)
+
+
+ @classmethod
+ def numberOfWorkTypes(cls):
+ return len(WorkItem.__subclasses__())
+
+
+# Priority for work - used to order work items in the job queue
+WORK_PRIORITY_LOW = 1
+WORK_PRIORITY_MEDIUM = 2
+WORK_PRIORITY_HIGH = 3
+
+
+class WorkItem(Record):
+ """
+ A L{WorkItem} is an item of work which may be stored in a database, then
+ executed later.
+
+ L{WorkItem} is an abstract class, since it is a L{Record} with no table
+ associated via L{fromTable}. Concrete subclasses must associate a specific
+ table by inheriting like so::
+
+ class MyWorkItem(WorkItem, fromTable(schema.MY_TABLE)):
+
+ Concrete L{WorkItem}s should generally not be created directly; they are
+ both created and thereby implicitly scheduled to be executed by calling
+ L{enqueueWork <twext.enterprise.ienterprise.IQueuer.enqueueWork>} with the
+ appropriate L{WorkItem} concrete subclass. There are different queue
+ implementations (L{PeerConnectionPool} and L{LocalQueuer}, for example), so
+ the exact timing and location of the work execution may differ.
+
+ L{WorkItem}s may be constrained in the ordering and timing of their
+ execution, to control concurrency and for performance reasons respectively.
+
+ Although all the usual database mutual-exclusion rules apply to work
+ executed in L{WorkItem.doWork}, implicit database row locking is not always
+ the best way to manage concurrency. They have some problems, including:
+
+ - implicit locks are easy to accidentally acquire out of order, which
+ can lead to deadlocks
+
+ - implicit locks are easy to forget to acquire correctly - for example,
+ any read operation which subsequently turns into a write operation
+ must have been acquired with C{Select(..., ForUpdate=True)}, but it
+ is difficult to consistently indicate that methods which abstract out
+ read operations must pass this flag in certain cases and not others.
+
+ - implicit locks are held until the transaction ends, which means that
+ if expensive (long-running) queue operations share the same lock with
+ cheap (short-running) queue operations or user interactions, the
+ cheap operations all have to wait for the expensive ones to complete,
+ but continue to consume whatever database resources they were using.
+
+ In order to ameliorate these problems with potentially concurrent work
+ that uses the same resources, L{WorkItem} provides a database-wide mutex
+ that is automatically acquired at the beginning of the transaction and
+ released at the end. To use it, simply L{align
+ <twext.enterprise.dal.record.Record.namingConvention>} the C{group}
+ attribute on your L{WorkItem} subclass with a column holding a string
+ (varchar). L{WorkItem} subclasses with the same value for C{group} will
+ not execute their C{doWork} methods concurrently. Furthermore, if the lock
+ cannot be quickly acquired, database resources associated with the
+ transaction attempting it will be released, and the transaction rolled back
+ until a future transaction I{can} can acquire it quickly. If you do not
+ want any limits to concurrency, simply leave it set to C{None}.
+
+ In some applications it's possible to coalesce work together; to grab
+ multiple L{WorkItem}s in one C{doWork} transaction. All you need to do is
+ to delete the rows which back other L{WorkItem}s from the database, and
+ they won't be processed. Using the C{group} attribute, you can easily
+ prevent concurrency so that you can easily group these items together and
+ remove them as a set (otherwise, other workers might be attempting to
+ concurrently work on them and you'll get deletion errors).
+
+ However, if doing more work at once is less expensive, and you want to
+ avoid processing lots of individual rows in tiny transactions, you may also
+ delay the execution of a L{WorkItem} by setting its C{notBefore} attribute.
+ This must be backed by a database timestamp, so that processes which happen
+ to be restarting and examining the work to be done in the database don't
+ jump the gun and do it too early.
+
+ @cvar workID: the unique identifier (primary key) for items of this type.
+ On an instance of a concrete L{WorkItem} subclass, this attribute must
+ be an integer; on the concrete L{WorkItem} subclass itself, this
+ attribute must be a L{twext.enterprise.dal.syntax.ColumnSyntax}. Note
+ that this is automatically taken care of if you simply have a
+ corresponding C{work_id} column in the associated L{fromTable} on your
+ L{WorkItem} subclass. This column must be unique, and it must be an
+ integer. In almost all cases, this column really ought to be filled
+ out by a database-defined sequence; if not, you need some other
+ mechanism for establishing a cluster-wide sequence.
+ @type workID: L{int} on instance,
+ L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+ @cvar notBefore: the timestamp before which this item should I{not} be
+ processed. If unspecified, this should be the date and time of the
+ creation of the L{WorkItem}.
+ @type notBefore: L{datetime.datetime} on instance,
+ L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+ @ivar group: If not C{None}, a unique-to-the-database identifier for which
+ only one L{WorkItem} will execute at a time.
+ @type group: L{unicode} or L{NoneType}
+ """
+
+ group = None
+ priority = WORK_PRIORITY_LOW # Default - subclasses should override
+
+
+ @classmethod
+ @inlineCallbacks
+ def makeJob(cls, transaction, **kwargs):
+ """
+ A new work item needs to be created. First we create a Job record, then we create
+ the actual work item related to the job.
+
+ @param transaction: the transaction to use
+ @type transaction: L{IAsyncTransaction}
+ """
+
+ jobargs = {
+ "workType": cls.table.model.name
+ }
+ def _transferArg(name):
+ if name in kwargs:
+ jobargs[name] = kwargs[name]
+ del kwargs[name]
+
+ _transferArg("jobID")
+ if "priority" in kwargs:
+ _transferArg("priority")
+ else:
+ jobargs["priority"] = cls.priority
+ _transferArg("weight")
+ _transferArg("notBefore")
+ _transferArg("notAfter")
+
+ job = yield JobItem.create(transaction, **jobargs)
+
+ kwargs["jobID"] = job.jobID
+ work = yield cls.create(transaction, **kwargs)
+ work.__dict__["job"] = job
+ returnValue(work)
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadForJob(cls, txn, jobID):
+ workItems = yield cls.query(txn, (cls.jobID == jobID))
+ returnValue(workItems)
+
+
+ def doWork(self):
+ """
+ Subclasses must implement this to actually perform the queued work.
+
+ This method will be invoked in a worker process.
+
+ This method does I{not} need to delete the row referencing it; that
+ will be taken care of by the job queuing machinery.
+ """
+ raise NotImplementedError
+
+
+ @classmethod
+ def forTableName(cls, tableName):
+ """
+ Look up a work-item class given a particular table name. Factoring
+ this correctly may place it into L{twext.enterprise.record.Record}
+ instead; it is probably generally useful to be able to look up a mapped
+ class from a table.
+
+ @param tableName: the name of the table to look up
+ @type tableName: L{str}
+
+ @return: the relevant subclass
+ @rtype: L{type}
+ """
+ for subcls in cls.__subclasses__():
+ clstable = getattr(subcls, "table", None)
+ if tableName == clstable.model.name:
+ return subcls
+ raise KeyError("No mapped {0} class for {1}.".format(
+ cls, tableName
+ ))
+
+
+
+class PerformJob(Command):
+ """
+ Notify another process that it must do a job that has been persisted to
+ the database, by informing it of the job ID.
+ """
+
+ arguments = [
+ ("jobID", Integer()),
+ ]
+ response = []
+
+
+
+class ReportLoad(Command):
+ """
+ Notify another node of the total, current load for this whole node (all of
+ its workers).
+ """
+ arguments = [
+ ("load", Integer())
+ ]
+ response = []
+
+
+
+class IdentifyNode(Command):
+ """
+ Identify this node to its peer. The connector knows which hostname it's
+ looking for, and which hostname it considers itself to be, only the
+ initiator (not the listener) issues this command. This command is
+ necessary because we don't want to rely on DNS; if reverse DNS weren't set
+ up perfectly, the listener would not be able to identify its peer, and it
+ is easier to modify local configuration so that L{socket.getfqdn} returns
+ the right value than to ensure that DNS does.
+ """
+
+ arguments = [
+ ("host", String()),
+ ("port", Integer()),
+ ]
+
+
+
+class ConnectionFromPeerNode(AMP):
+ """
+ A connection to a peer node. Symmetric; since the "client" and the
+ "server" both serve the same role, the logic is the same in every node.
+
+ @ivar localWorkerPool: the pool of local worker processes that can process
+ queue work.
+ @type localWorkerPool: L{WorkerConnectionPool}
+
+ @ivar _reportedLoad: The number of outstanding requests being processed by
+ the peer of this connection, from all requestors (both the host of this
+ connection and others), as last reported by the most recent
+ L{ReportLoad} message received from the peer.
+ @type _reportedLoad: L{int}
+
+ @ivar _bonusLoad: The number of additional outstanding requests being
+ processed by the peer of this connection; the number of requests made
+ by the host of this connection since the last L{ReportLoad} message.
+ @type _bonusLoad: L{int}
+ """
+ implements(_IJobPerformer)
+
+ def __init__(self, peerPool, boxReceiver=None, locator=None):
+ """
+ Initialize this L{ConnectionFromPeerNode} with a reference to a
+ L{PeerConnectionPool}, as well as required initialization arguments for
+ L{AMP}.
+
+ @param peerPool: the connection pool within which this
+ L{ConnectionFromPeerNode} is a participant.
+ @type peerPool: L{PeerConnectionPool}
+
+ @see: L{AMP.__init__}
+ """
+ self.peerPool = peerPool
+ self._bonusLoad = 0
+ self._reportedLoad = 0
+ super(ConnectionFromPeerNode, self).__init__(
+ boxReceiver, locator
+ )
+
+
+ def reportCurrentLoad(self):
+ """
+ Report the current load for the local worker pool to this peer.
+ """
+ return self.callRemote(ReportLoad, load=self.totalLoad())
+
+
+ @ReportLoad.responder
+ def reportedLoad(self, load):
+ """
+ The peer reports its load.
+ """
+ self._reportedLoad = (load - self._bonusLoad)
+ return {}
+
+
+ def startReceivingBoxes(self, sender):
+ """
+ Connection is up and running; add this to the list of active peers.
+ """
+ r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
+ self.peerPool.addPeerConnection(self)
+ return r
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ The connection has shut down; remove this from the list of active
+ peers.
+ """
+ self.peerPool.removePeerConnection(self)
+ r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
+ return r
+
+
+ def currentLoadEstimate(self):
+ """
+ What is the current load estimate for this peer?
+
+ @return: The number of full "slots", i.e. currently-being-processed
+ queue items (and other items which may contribute to this process's
+ load, such as currently-being-processed client requests).
+ @rtype: L{int}
+ """
+ return self._reportedLoad + self._bonusLoad
+
+
+ def performJob(self, jobID):
+ """
+ A L{local worker connection <ConnectionFromWorker>} is asking this
+ specific peer node-controller process to perform a job, having
+ already determined that it's appropriate.
+
+ @see: L{_IJobPerformer.performJob}
+ """
+ d = self.callRemote(PerformJob, jobID=jobID)
+ self._bonusLoad += 1
+
+ @d.addBoth
+ def performed(result):
+ self._bonusLoad -= 1
+ return result
+
+ @d.addCallback
+ def success(result):
+ return None
+
+ return d
+
+
+ @PerformJob.responder
+ def dispatchToWorker(self, jobID):
+ """
+ A remote peer node has asked this node to do a job; dispatch it to
+ a local worker on this node.
+
+ @param jobID: the identifier of the job.
+ @type jobID: L{int}
+
+ @return: a L{Deferred} that fires when the work has been completed.
+ """
+ d = self.peerPool.performJobForPeer(jobID)
+ d.addCallback(lambda ignored: {})
+ return d
+
+
+ @IdentifyNode.responder
+ def identifyPeer(self, host, port):
+ self.peerPool.mapPeer(host, port, self)
+ return {}
+
+
+
+class WorkerConnectionPool(object):
+ """
+ A pool of L{ConnectionFromWorker}s.
+
+ L{WorkerConnectionPool} also implements the same implicit protocol as a
+ L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
+ processes rather than to a remote connection pool.
+ """
+ implements(_IJobPerformer)
+
+ def __init__(self, maximumLoadPerWorker=5):
+ self.workers = []
+ self.maximumLoadPerWorker = maximumLoadPerWorker
+
+
+ def addWorker(self, worker):
+ """
+ Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+ it can be selected.
+ """
+ self.workers.append(worker)
+
+
+ def removeWorker(self, worker):
+ """
+ Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+ was previously added.
+ """
+ self.workers.remove(worker)
+
+
+ def hasAvailableCapacity(self):
+ """
+ Does this worker connection pool have any local workers who have spare
+ hasAvailableCapacity to process another queue item?
+ """
+ for worker in self.workers:
+ if worker.currentLoad < self.maximumLoadPerWorker:
+ return True
+ return False
+
+
+ def allWorkerLoad(self):
+ """
+ The total load of all currently connected workers.
+ """
+ return sum(worker.currentLoad for worker in self.workers)
+
+
+ def _selectLowestLoadWorker(self):
+ """
+ Select the local connection with the lowest current load, or C{None} if
+ all workers are too busy.
+
+ @return: a worker connection with the lowest current load.
+ @rtype: L{ConnectionFromWorker}
+ """
+ return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
+
+
+ def performJob(self, jobID):
+ """
+ Select a local worker that is idle enough to perform the given job,
+ then ask them to perform it.
+
+ @param jobID: The primary key identifier of the given job.
+ @type jobID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
+ preferredWorker = self._selectLowestLoadWorker()
+ result = preferredWorker.performJob(jobID)
+ return result
+
+
+
+class ConnectionFromWorker(AMP):
+ """
+ An individual connection from a worker, as seen from the master's
+ perspective. L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+ """
+
+ def __init__(self, peerPool, boxReceiver=None, locator=None):
+ super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
+ self.peerPool = peerPool
+ self._load = 0
+
+
+ @property
+ def currentLoad(self):
+ """
+ What is the current load of this worker?
+ """
+ return self._load
+
+
+ def startReceivingBoxes(self, sender):
+ """
+ Start receiving AMP boxes from the peer. Initialize all necessary
+ state.
+ """
+ result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+ self.peerPool.workerPool.addWorker(self)
+ return result
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ AMP boxes will no longer be received.
+ """
+ result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+ self.peerPool.workerPool.removeWorker(self)
+ return result
+
+
+ @PerformJob.responder
+ def performJob(self, jobID):
+ """
+ Dispatch a job to this worker.
+
+ @see: The responder for this should always be
+ L{ConnectionFromController.actuallyReallyExecuteJobHere}.
+ """
+ d = self.callRemote(PerformJob, jobID=jobID)
+ self._load += 1
+
+ @d.addBoth
+ def f(result):
+ self._load -= 1
+ return result
+
+ return d
+
+
+
+class ConnectionFromController(AMP):
+ """
+ A L{ConnectionFromController} is the connection to a node-controller
+ process, in a worker process. It processes requests from its own
+ controller to do work. It is the opposite end of the connection from
+ L{ConnectionFromWorker}.
+ """
+ implements(IQueuer)
+
+ def __init__(self, transactionFactory, whenConnected,
+ boxReceiver=None, locator=None):
+ super(ConnectionFromController, self).__init__(boxReceiver, locator)
+ self.transactionFactory = transactionFactory
+ self.whenConnected = whenConnected
+ # FIXME: Glyph it appears WorkProposal expects this to have reactor...
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def startReceivingBoxes(self, sender):
+ super(ConnectionFromController, self).startReceivingBoxes(sender)
+ self.whenConnected(self)
+
+
+ def choosePerformer(self):
+ """
+ To conform with L{WorkProposal}'s expectations, which may run in either
+ a controller (against a L{PeerConnectionPool}) or in a worker (against
+ a L{ConnectionFromController}), this is implemented to always return
+ C{self}, since C{self} is also an object that has a C{performJob}
+ method.
+ """
+ return self
+
+
+ def performJob(self, jobID):
+ """
+ Ask the controller to perform a job on our behalf.
+ """
+ return self.callRemote(PerformJob, jobID=jobID)
+
+
+ @inlineCallbacks
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ There is some work to do. Do it, ideally someplace else, ideally in
+ parallel. Later, let the caller know that the work has been completed
+ by firing a L{Deferred}.
+
+ @param workItemType: The type of work item to be enqueued.
+ @type workItemType: A subtype of L{WorkItem}
+
+ @param kw: The parameters to construct a work item.
+ @type kw: keyword parameters to C{workItemType.create}, i.e.
+ C{workItemType.__init__}
+
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
+ """
+ wp = WorkProposal(self, txn, workItemType, kw)
+ yield wp._start()
+ returnValue(wp)
+
+
+ @PerformJob.responder
+ def actuallyReallyExecuteJobHere(self, jobID):
+ """
+ This is where it's time to actually do the job. The controller
+ process has instructed this worker to do it; so, look up the data in
+ the row, and do it.
+ """
+ d = ultimatelyPerform(self.transactionFactory, jobID)
+ d.addCallback(lambda ignored: {})
+ return d
+
+
+
+def ultimatelyPerform(txnFactory, jobID):
+ """
+ Eventually, after routing the job to the appropriate place, somebody
+ actually has to I{do} it.
+
+ @param txnFactory: a 0- or 1-argument callable that creates an
+ L{IAsyncTransaction}
+ @type txnFactory: L{callable}
+
+ @param jobID: the ID of the job to be performed
+ @type jobID: L{int}
+
+ @return: a L{Deferred} which fires with C{None} when the job has been
+ performed, or fails if the job can't be performed.
+ """
+ @inlineCallbacks
+ def runJob(txn):
+ try:
+ job = yield JobItem.load(txn, jobID)
+ yield job.run()
+ except NoSuchRecord:
+ # The record has already been removed
+ pass
+
+ return inTransaction(txnFactory, runJob)
+
+
+
+class LocalPerformer(object):
+ """
+ Implementor of C{performJob} that does its work in the local process,
+ regardless of other conditions.
+ """
+ implements(_IJobPerformer)
+
+ def __init__(self, txnFactory):
+ """
+ Create this L{LocalPerformer} with a transaction factory.
+ """
+ self.txnFactory = txnFactory
+
+
+ def performJob(self, jobID):
+ """
+ Perform the given job right now.
+ """
+ return ultimatelyPerform(self.txnFactory, jobID)
+
+
+
+class WorkerFactory(Factory, object):
+ """
+ Factory, to be used as the client to connect from the worker to the
+ controller.
+ """
+
+ def __init__(self, transactionFactory, whenConnected):
+ """
+ Create a L{WorkerFactory} with a transaction factory and a schema.
+ """
+ self.transactionFactory = transactionFactory
+ self.whenConnected = whenConnected
+
+
+ def buildProtocol(self, addr):
+ """
+ Create a L{ConnectionFromController} connected to the
+ transactionFactory and store.
+ """
+ return ConnectionFromController(self.transactionFactory, self.whenConnected)
+
+
+
+class TransactionFailed(Exception):
+ """
+ A transaction failed.
+ """
+
+
+
+def _cloneDeferred(d):
+ """
+ Make a new Deferred, adding callbacks to C{d}.
+
+ @return: another L{Deferred} that fires with C{d's} result when C{d} fires.
+ @rtype: L{Deferred}
+ """
+ d2 = Deferred()
+ d.chainDeferred(d2)
+ return d2
+
+
+
+class WorkProposal(object):
+ """
+ A L{WorkProposal} is a proposal for work that will be executed, perhaps on
+ another node, perhaps in the future.
+
+ @ivar _chooser: The object which will choose where the work in this
+ proposal gets performed. This must have both a C{choosePerformer}
+ method and a C{reactor} attribute, providing an L{IReactorTime}.
+ @type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
+
+ @ivar txn: The transaction where the work will be enqueued.
+ @type txn: L{IAsyncTransaction}
+
+ @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
+ @type workItemType: L{WorkItem} subclass
+
+ @ivar kw: The keyword arguments to pass to C{self.workItemType.create} to
+ construct it.
+ @type kw: L{dict}
+ """
+
+ def __init__(self, chooser, txn, workItemType, kw):
+ self._chooser = chooser
+ self.txn = txn
+ self.workItemType = workItemType
+ self.kw = kw
+ self._whenExecuted = Deferred()
+ self._whenCommitted = Deferred()
+ self.workItem = None
+
+
+ @inlineCallbacks
+ def _start(self):
+ """
+ Execute this L{WorkProposal} by creating the work item in the database,
+ waiting for the transaction where that addition was completed to
+ commit, and asking the local node controller process to do the work.
+ """
+ try:
+ created = yield self.workItemType.makeJob(self.txn, **self.kw)
+ except Exception:
+ self._whenCommitted.errback(TransactionFailed)
+ raise
+ else:
+ self.workItem = created
+
+ @self.txn.postCommit
+ def whenDone():
+ self._whenCommitted.callback(self)
+
+ def maybeLater():
+ performer = self._chooser.choosePerformer()
+
+ @passthru(
+ performer.performJob(created.jobID).addCallback
+ )
+ def performed(result):
+ self._whenExecuted.callback(self)
+
+ @performed.addErrback
+ def notPerformed(why):
+ self._whenExecuted.errback(why)
+
+ reactor = self._chooser.reactor
+ when = max(0, astimestamp(created.job.notBefore) - reactor.seconds()) if created.job.notBefore is not None else 0
+ # TODO: Track the returned DelayedCall so it can be stopped
+ # when the service stops.
+ self._chooser.reactor.callLater(when, maybeLater)
+
+ @self.txn.postAbort
+ def whenFailed():
+ self._whenCommitted.errback(TransactionFailed)
+
+
+ def whenExecuted(self):
+ """
+ Let the caller know when the proposed work has been fully executed.
+
+ @note: The L{Deferred} returned by C{whenExecuted} should be used with
+ extreme caution. If an application decides to do any
+ database-persistent work as a result of this L{Deferred} firing,
+ that work I{may be lost} as a result of a service being normally
+ shut down between the time that the work is scheduled and the time
+ that it is executed. So, the only things that should be added as
+ callbacks to this L{Deferred} are those which are ephemeral, in
+ memory, and reflect only presentation state associated with the
+ user's perception of the completion of work, not logical chains of
+ work which need to be completed in sequence; those should all be
+ completed within the transaction of the L{WorkItem.doWork} that
+ gets executed.
+
+ @return: a L{Deferred} that fires with this L{WorkProposal} when the
+ work has been completed remotely.
+ """
+ return _cloneDeferred(self._whenExecuted)
+
+
+ def whenProposed(self):
+ """
+ Let the caller know when the work has been proposed; i.e. when the work
+ is first transmitted to the database.
+
+ @return: a L{Deferred} that fires with this L{WorkProposal} when the
+ relevant commands have been sent to the database to create the
+ L{WorkItem}, and fails if those commands do not succeed for some
+ reason.
+ """
+ return succeed(self)
+
+
+ def whenCommitted(self):
+ """
+ Let the caller know when the work has been committed to; i.e. when the
+ transaction where the work was proposed has been committed to the
+ database.
+
+ @return: a L{Deferred} that fires with this L{WorkProposal} when the
+ relevant transaction has been committed, or fails if the
+ transaction is not committed for any reason.
+ """
+ return _cloneDeferred(self._whenCommitted)
+
+
+
+class _BaseQueuer(object):
+ implements(IQueuer)
+
+ def __init__(self):
+ super(_BaseQueuer, self).__init__()
+ self.proposalCallbacks = set()
+
+
+ def callWithNewProposals(self, callback):
+ self.proposalCallbacks.add(callback)
+
+
+ def transferProposalCallbacks(self, newQueuer):
+ newQueuer.proposalCallbacks = self.proposalCallbacks
+ return newQueuer
+
+
+ @inlineCallbacks
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ There is some work to do. Do it, someplace else, ideally in parallel.
+ Later, let the caller know that the work has been completed by firing a
+ L{Deferred}.
+
+ @param workItemType: The type of work item to be enqueued.
+ @type workItemType: A subtype of L{WorkItem}
+
+ @param kw: The parameters to construct a work item.
+ @type kw: keyword parameters to C{workItemType.create}, i.e.
+ C{workItemType.__init__}
+
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
+ """
+ wp = WorkProposal(self, txn, workItemType, kw)
+ yield wp._start()
+ for callback in self.proposalCallbacks:
+ callback(wp)
+ returnValue(wp)
+
+
+
+class PeerConnectionPool(_BaseQueuer, MultiService, object):
+ """
+ Each node has a L{PeerConnectionPool} connecting it to all the other nodes
+ currently active on the same database.
+
+ @ivar hostname: The hostname where this node process is running, as
+ reported by the local host's configuration. Possibly this should be
+ obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
+ although hosts within a cluster may be configured with the same
+ C{ServerHostName}; TODO need to confirm.
+ @type hostname: L{bytes}
+
+ @ivar thisProcess: a L{NodeInfo} representing this process, which is
+ initialized when this L{PeerConnectionPool} service is started via
+ C{startService}. May be C{None} if this service is not fully started
+ up or if it is shutting down.
+ @type thisProcess: L{NodeInfo}
+
+ @ivar queueProcessTimeout: The amount of time after a L{WorkItem} is
+ scheduled to be processed (its C{notBefore} attribute) that it is
+ considered to be "orphaned" and will be run by a lost-work check rather
+ than waiting for it to be requested. By default, 10 minutes.
+ @type queueProcessTimeout: L{float} (in seconds)
+
+ @ivar queueDelayedProcessInterval: The amount of time between database
+ pings, i.e. checks for over-due queue items that might have been
+ orphaned by a controller process that died mid-transaction. This is
+ how often the shared database should be pinged by I{all} nodes (i.e.,
+ all controller processes, or each instance of L{PeerConnectionPool});
+ each individual node will ping commensurately less often as more nodes
+ join the database.
+ @type queueDelayedProcessInterval: L{float} (in seconds)
+
+ @ivar reactor: The reactor used for scheduling timed events.
+ @type reactor: L{IReactorTime} provider.
+
+ @ivar peers: The list of currently connected peers.
+ @type peers: L{list} of L{PeerConnectionPool}
+ """
+ implements(IQueuer)
+
+ from socket import getfqdn
+ from os import getpid
+ getfqdn = staticmethod(getfqdn)
+ getpid = staticmethod(getpid)
+
+ queueProcessTimeout = (10.0 * 60.0)
+ queueDelayedProcessInterval = (60.0)
+
+ def __init__(self, reactor, transactionFactory, ampPort):
+ """
+ Initialize a L{PeerConnectionPool}.
+
+ @param ampPort: The AMP TCP port number to listen on for inter-host
+ communication. This must be an integer (and not, say, an endpoint,
+ or an endpoint description) because we need to communicate it to
+ the other peers in the cluster in a way that will be meaningful to
+ them as clients.
+ @type ampPort: L{int}
+
+ @param transactionFactory: a 0- or 1-argument callable that produces an
+ L{IAsyncTransaction}
+ """
+ super(PeerConnectionPool, self).__init__()
+ self.reactor = reactor
+ self.transactionFactory = transactionFactory
+ self.hostname = self.getfqdn()
+ self.pid = self.getpid()
+ self.ampPort = ampPort
+ self.thisProcess = None
+ self.workerPool = WorkerConnectionPool()
+ self.peers = []
+ self.mappedPeers = {}
+ self._startingUp = None
+ self._listeningPort = None
+ self._lastSeenTotalNodes = 1
+ self._lastSeenNodeIndex = 1
+
+
+ def addPeerConnection(self, peer):
+ """
+ Add a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.append(peer)
+
+
+ def totalLoad(self):
+ return self.workerPool.allWorkerLoad()
+
+
+ def workerListenerFactory(self):
+ """
+ Factory that listens for connections from workers.
+ """
+ f = Factory()
+ f.buildProtocol = lambda addr: ConnectionFromWorker(self)
+ return f
+
+
+ def removePeerConnection(self, peer):
+ """
+ Remove a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.remove(peer)
+
+
+ def choosePerformer(self, onlyLocally=False):
+ """
+ Choose a peer to distribute work to based on the current known slot
+ occupancy of the other nodes. Note that this will prefer distributing
+ work to local workers until the current node is full, because that
+ should be lower-latency. Also, if no peers are available, work will be
+ submitted locally even if the worker pool is already over-subscribed.
+
+ @return: the chosen peer.
+ @rtype: L{_IJobPerformer} L{ConnectionFromPeerNode} or
+ L{WorkerConnectionPool}
+ """
+ if self.workerPool.hasAvailableCapacity():
+ return self.workerPool
+
+ if self.peers and not onlyLocally:
+ return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
+ else:
+ return LocalPerformer(self.transactionFactory)
+
+
+ def performJobForPeer(self, jobID):
+ """
+ A peer has requested us to perform a job; choose a job performer
+ local to this node, and then execute it.
+ """
+ performer = self.choosePerformer(onlyLocally=True)
+ return performer.performJob(jobID)
+
+
+ def totalNumberOfNodes(self):
+ """
+ How many nodes are there, total?
+
+ @return: the maximum number of other L{PeerConnectionPool} instances
+ that may be connected to the database described by
+ C{self.transactionFactory}. Note that this is not the current
+ count by connectivity, but the count according to the database.
+ @rtype: L{int}
+ """
+ # TODO
+ return self._lastSeenTotalNodes
+
+
+ def nodeIndex(self):
+ """
+ What ordinal does this node, i.e. this instance of
+ L{PeerConnectionPool}, occupy within the ordered set of all nodes
+ connected to the database described by C{self.transactionFactory}?
+
+ @return: the index of this node within the total collection. For
+ example, if this L{PeerConnectionPool} is 6 out of 30, this method
+ will return C{6}.
+ @rtype: L{int}
+ """
+ # TODO
+ return self._lastSeenNodeIndex
+
+
+ def _periodicLostWorkCheck(self):
+ """
+ Periodically, every node controller has to check to make sure that work
+ hasn't been dropped on the floor by someone. In order to do that it
+ queries each work-item table.
+ """
+ @inlineCallbacks
+ def workCheck(txn):
+ if self.thisProcess:
+ nodes = [(node.hostname, node.port) for node in
+ (yield self.activeNodes(txn))]
+ nodes.sort()
+ self._lastSeenTotalNodes = len(nodes)
+ self._lastSeenNodeIndex = nodes.index(
+ (self.thisProcess.hostname, self.thisProcess.port)
+ )
+
+ # TODO: here is where we should iterate over the unlocked items that
+ # are due, ordered by priority, notBefore etc
+ tooLate = datetime.utcfromtimestamp(
+ self.reactor.seconds() - self.queueProcessTimeout
+ )
+ overdueItems = (yield JobItem.query(
+ txn, (JobItem.notBefore < tooLate))
+ )
+ for overdueItem in overdueItems:
+ peer = self.choosePerformer()
+ yield peer.performJob(overdueItem.jobID)
+
+ return inTransaction(self.transactionFactory, workCheck)
+
+ _currentWorkDeferred = None
+ _lostWorkCheckCall = None
+
+ def _lostWorkCheckLoop(self):
+ """
+ While the service is running, keep checking for any overdue / lost work
+ items and re-submit them to the cluster for processing. Space out
+ those checks in time based on the size of the cluster.
+ """
+ self._lostWorkCheckCall = None
+
+ @passthru(
+ self._periodicLostWorkCheck().addErrback(log.err).addCallback
+ )
+ def scheduleNext(result):
+ self._currentWorkDeferred = None
+ if not self.running:
+ return
+ index = self.nodeIndex()
+ now = self.reactor.seconds()
+
+ interval = self.queueDelayedProcessInterval
+ count = self.totalNumberOfNodes()
+ when = (now - (now % interval)) + (interval * (count + index))
+ delay = when - now
+ self._lostWorkCheckCall = self.reactor.callLater(
+ delay, self._lostWorkCheckLoop
+ )
+
+ self._currentWorkDeferred = scheduleNext
+
+
+ def startService(self):
+ """
+ Register ourselves with the database and establish all outgoing
+ connections to other servers in the cluster.
+ """
+ @inlineCallbacks
+ def startup(txn):
+ endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+ # If this fails, the failure mode is going to be ugly, just like
+ # all conflicted-port failures. But, at least it won't proceed.
+ self._listeningPort = yield endpoint.listen(self.peerFactory())
+ self.ampPort = self._listeningPort.getHost().port
+ yield Lock.exclusive(NodeInfo.table).on(txn)
+ nodes = yield self.activeNodes(txn)
+ selves = [node for node in nodes
+ if ((node.hostname == self.hostname) and
+ (node.port == self.ampPort))]
+ if selves:
+ self.thisProcess = selves[0]
+ nodes.remove(self.thisProcess)
+ yield self.thisProcess.update(pid=self.pid,
+ time=datetime.now())
+ else:
+ self.thisProcess = yield NodeInfo.create(
+ txn, hostname=self.hostname, port=self.ampPort,
+ pid=self.pid, time=datetime.now()
+ )
+
+ for node in nodes:
+ self._startConnectingTo(node)
+
+ self._startingUp = inTransaction(self.transactionFactory, startup)
+
+ @self._startingUp.addBoth
+ def done(result):
+ self._startingUp = None
+ super(PeerConnectionPool, self).startService()
+ self._lostWorkCheckLoop()
+ return result
+
+
+ @inlineCallbacks
+ def stopService(self):
+ """
+ Stop this service, terminating any incoming or outgoing connections.
+ """
+ yield super(PeerConnectionPool, self).stopService()
+
+ if self._startingUp is not None:
+ yield self._startingUp
+
+ if self._listeningPort is not None:
+ yield self._listeningPort.stopListening()
+
+ if self._lostWorkCheckCall is not None:
+ self._lostWorkCheckCall.cancel()
+
+ if self._currentWorkDeferred is not None:
+ yield self._currentWorkDeferred
+
+ for peer in self.peers:
+ peer.transport.abortConnection()
+
+
+ def activeNodes(self, txn):
+ """
+ Load information about all other nodes.
+ """
+ return NodeInfo.all(txn)
+
+
+ def mapPeer(self, host, port, peer):
+ """
+ A peer has been identified as belonging to the given host/port
+ combination. Disconnect any other peer that claims to be connected for
+ the same peer.
+ """
+ # if (host, port) in self.mappedPeers:
+ # TODO: think about this for race conditions
+ # self.mappedPeers.pop((host, port)).transport.loseConnection()
+ self.mappedPeers[(host, port)] = peer
+
+
+ def _startConnectingTo(self, node):
+ """
+ Start an outgoing connection to another master process.
+
+ @param node: a description of the master to connect to.
+ @type node: L{NodeInfo}
+ """
+ connected = node.endpoint(self.reactor).connect(self.peerFactory())
+
+ def whenConnected(proto):
+ self.mapPeer(node.hostname, node.port, proto)
+ proto.callRemote(
+ IdentifyNode,
+ host=self.thisProcess.hostname,
+ port=self.thisProcess.port
+ ).addErrback(noted, "identify")
+
+ def noted(err, x="connect"):
+ log.msg(
+ "Could not {0} to cluster peer {1} because {2}"
+ .format(x, node, str(err.value))
+ )
+
+ connected.addCallbacks(whenConnected, noted)
+
+
+ def peerFactory(self):
+ """
+ Factory for peer connections.
+
+ @return: a L{Factory} that will produce L{ConnectionFromPeerNode}
+ protocols attached to this L{PeerConnectionPool}.
+ """
+ return _PeerPoolFactory(self)
+
+
+
+class _PeerPoolFactory(Factory, object):
+ """
+ Protocol factory responsible for creating L{ConnectionFromPeerNode}
+ connections, both client and server.
+ """
+
+ def __init__(self, peerConnectionPool):
+ self.peerConnectionPool = peerConnectionPool
+
+
+ def buildProtocol(self, addr):
+ return ConnectionFromPeerNode(self.peerConnectionPool)
+
+
+
+class LocalQueuer(_BaseQueuer):
+ """
+ When work is enqueued with this queuer, it is just executed locally.
+ """
+ implements(IQueuer)
+
+ def __init__(self, txnFactory, reactor=None):
+ super(LocalQueuer, self).__init__()
+ self.txnFactory = txnFactory
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def choosePerformer(self):
+ """
+ Choose to perform the work locally.
+ """
+ return LocalPerformer(self.txnFactory)
+
+
+
+class NonPerformer(object):
+ """
+ Implementor of C{performJob} that doesn't actual perform any work. This
+ is used in the case where you want to be able to enqueue work for someone
+ else to do, but not take on any work yourself (such as a command line
+ tool).
+ """
+ implements(_IJobPerformer)
+
+ def performJob(self, jobID):
+ """
+ Don't perform job.
+ """
+ return succeed(None)
+
+
+
+class NonPerformingQueuer(_BaseQueuer):
+ """
+ When work is enqueued with this queuer, it is never executed locally.
+ It's expected that the polling machinery will find the work and perform it.
+ """
+ implements(IQueuer)
+
+ def __init__(self, reactor=None):
+ super(NonPerformingQueuer, self).__init__()
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def choosePerformer(self):
+ """
+ Choose to perform the work locally.
+ """
+ return NonPerformer()
</ins></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_adbapi2py"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_adbapi2.py (12780 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_adbapi2.py        2014-03-01 15:45:25 UTC (rev 12780)
+++ twext/trunk/twext/enterprise/test/test_adbapi2.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -192,13 +192,13 @@
</span><span class="cx"> a = self.createTransaction()
</span><span class="cx">
</span><span class="cx"> alphaResult = self.resultOf(a.execSQL("alpha"))
</span><del>- [[counter, echo]] = alphaResult[0]
</del><ins>+ [[_ignore_counter, _ignore_echo]] = alphaResult[0]
</ins><span class="cx">
</span><span class="cx"> b = self.createTransaction()
</span><span class="cx"> # "b" should have opened a connection.
</span><span class="cx"> self.assertEquals(len(self.factory.connections), 2)
</span><span class="cx"> betaResult = self.resultOf(b.execSQL("beta"))
</span><del>- [[bcounter, becho]] = betaResult[0]
</del><ins>+ [[bcounter, _ignore_becho]] = betaResult[0]
</ins><span class="cx">
</span><span class="cx"> # both "a" and "b" are holding open a connection now; let's try to open
</span><span class="cx"> # a third one. (The ordering will be deterministic even if this fails,
</span><span class="lines">@@ -214,13 +214,13 @@
</span><span class="cx"> commitResult = self.resultOf(b.commit())
</span><span class="cx">
</span><span class="cx"> # Now that "b" has committed, "c" should be able to complete.
</span><del>- [[ccounter, cecho]] = gammaResult[0]
</del><ins>+ [[ccounter, _ignore_cecho]] = gammaResult[0]
</ins><span class="cx">
</span><span class="cx"> # The connection for "a" ought to still be busy, so let's make sure
</span><span class="cx"> # we're using the one for "c".
</span><span class="cx"> self.assertEquals(ccounter, bcounter)
</span><span class="cx">
</span><del>- # Sanity check: the commit should have succeded!
</del><ins>+ # Sanity check: the commit should have succeeded!
</ins><span class="cx"> self.assertEquals(commitResult, [None])
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -231,7 +231,7 @@
</span><span class="cx"> """
</span><span class="cx"> a = self.createTransaction()
</span><span class="cx"> alphaResult = self.resultOf(a.execSQL("alpha"))
</span><del>- [[[counter, echo]]] = alphaResult
</del><ins>+ [[[_ignore_counter, _ignore_echo]]] = alphaResult
</ins><span class="cx"> self.assertEquals(len(self.factory.connections), 1)
</span><span class="cx"> self.assertEquals(len(self.holders), 1)
</span><span class="cx"> [holder] = self.holders
</span><span class="lines">@@ -452,7 +452,7 @@
</span><span class="cx"> for txn in txns:
</span><span class="cx"> # Make sure rollback will actually be executed.
</span><span class="cx"> results = self.resultOf(txn.execSQL("maybe change something!"))
</span><del>- [[[counter, echo]]] = results
</del><ins>+ [[[_ignore_counter, echo]]] = results
</ins><span class="cx"> self.assertEquals("maybe change something!", echo)
</span><span class="cx"> # Fail one (and only one) call to rollback().
</span><span class="cx"> self.factory.rollbackFail = True
</span><span class="lines">@@ -483,7 +483,7 @@
</span><span class="cx"> """
</span><span class="cx"> active = []
</span><span class="cx"> # Use up the available connections ...
</span><del>- for i in xrange(self.pool.maxConnections):
</del><ins>+ for _ignore in xrange(self.pool.maxConnections):
</ins><span class="cx"> active.append(self.createTransaction())
</span><span class="cx">
</span><span class="cx"> # ... so that this one has to be spooled.
</span><span class="lines">@@ -506,7 +506,7 @@
</span><span class="cx"> abortResult = self.resultOf(it.abort())
</span><span class="cx">
</span><span class="cx"> # steal it from the queue so we can do it out of order
</span><del>- d, work = self.holders[0]._q.get()
</del><ins>+ d, _ignore_work = self.holders[0]._q.get()
</ins><span class="cx">
</span><span class="cx"> # that should be the only work unit so don't continue if something else
</span><span class="cx"> # got in there
</span><span class="lines">@@ -716,7 +716,7 @@
</span><span class="cx">
</span><span class="cx"> self.factory.connections[0].executeWillFail(CustomExecuteFailed)
</span><span class="cx"> results = self.resultOf(txn.execSQL("hello, world!"))
</span><del>- [[[counter, echo]]] = results
</del><ins>+ [[[_ignore_counter, echo]]] = results
</ins><span class="cx"> self.assertEquals("hello, world!", echo)
</span><span class="cx">
</span><span class="cx"> # Two execution attempts should have been made, one on each connection.
</span><span class="lines">@@ -752,7 +752,7 @@
</span><span class="cx"> )
</span><span class="cx"> results = self.resultOf(txn.execSQL("hello, world!"))
</span><span class="cx"> txn.commit()
</span><del>- [[[counter, echo]]] = results
</del><ins>+ [[[_ignore_counter, echo]]] = results
</ins><span class="cx"> self.assertEquals("hello, world!", echo)
</span><span class="cx"> txn2 = self.createTransaction()
</span><span class="cx"> self.assertEquals(
</span><span class="lines">@@ -767,7 +767,7 @@
</span><span class="cx"> self.factory.connections[0].executeWillFail(CustomExecFail)
</span><span class="cx"> results = self.resultOf(txn2.execSQL("second try!"))
</span><span class="cx"> txn2.commit()
</span><del>- [[[counter, echo]]] = results
</del><ins>+ [[[_ignore_counter, echo]]] = results
</ins><span class="cx"> self.assertEquals("second try!", echo)
</span><span class="cx"> self.assertEquals(len(self.flushLoggedErrors(CustomExecFail)), 1)
</span><span class="cx">
</span><span class="lines">@@ -936,7 +936,7 @@
</span><span class="cx"> re-connection on the next try.
</span><span class="cx"> """
</span><span class="cx"> txn = self.createTransaction()
</span><del>- [[[counter, echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
</del><ins>+ [[[_ignore_counter, _ignore_echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
</ins><span class="cx"> self.factory.connections[0].executeWillFail(ZeroDivisionError)
</span><span class="cx"> [f] = self.resultOf(txn.execSQL("divide by zero", []))
</span><span class="cx"> f.trap(self.translateError(ZeroDivisionError))
</span><span class="lines">@@ -966,7 +966,7 @@
</span><span class="cx"> """
</span><span class="cx"> txn = self.createTransaction()
</span><span class="cx"> results = self.resultOf(txn.execSQL("maybe change something!"))
</span><del>- [[[counter, echo]]] = results
</del><ins>+ [[[_ignore_counter, echo]]] = results
</ins><span class="cx"> self.assertEquals("maybe change something!", echo)
</span><span class="cx"> self.factory.rollbackFail = True
</span><span class="cx"> [x] = self.resultOf(txn.abort())
</span><span class="lines">@@ -992,7 +992,7 @@
</span><span class="cx"> txn = self.createTransaction()
</span><span class="cx"> self.factory.commitFail = True
</span><span class="cx"> results = self.resultOf(txn.execSQL("maybe change something!"))
</span><del>- [[[counter, echo]]] = results
</del><ins>+ [[[_ignore_counter, echo]]] = results
</ins><span class="cx"> self.assertEquals("maybe change something!", echo)
</span><span class="cx"> [x] = self.resultOf(txn.commit())
</span><span class="cx"> x.trap(self.translateError(CommitFail))
</span><span class="lines">@@ -1210,7 +1210,7 @@
</span><span class="cx"> r = self.resultOf(
</span><span class="cx"> txn.execSQL("some-rows", raiseOnZeroRowCount=RuntimeError)
</span><span class="cx"> )
</span><del>- [[[counter, echo]]] = r
</del><ins>+ [[[_ignore_counter, echo]]] = r
</ins><span class="cx"> self.assertEquals(echo, "some-rows")
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -1338,7 +1338,6 @@
</span><span class="cx"> interacting with each other.
</span><span class="cx"> """
</span><span class="cx">
</span><del>-
</del><span class="cx"> def setParamstyle(self, paramstyle):
</span><span class="cx"> """
</span><span class="cx"> Change the paramstyle on both the pool and the client.
</span><span class="lines">@@ -1366,6 +1365,7 @@
</span><span class="cx"> self.assertEquals(len(self.factory.connections), 1)
</span><span class="cx">
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> class HookableOperationTests(TestCase):
</span><span class="cx"> """
</span><span class="cx"> Tests for L{_HookableOperation}.
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepyfromrev12780twextbranchesuserscdaboojobstwextenterprisetesttest_jobqueuepy"></a>
<div class="copfile"><h4>Copied: twext/trunk/twext/enterprise/test/test_jobqueue.py (from rev 12780, twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py) (0 => 12781)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py         (rev 0)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-03-01 16:17:48 UTC (rev 12781)
</span><span class="lines">@@ -0,0 +1,1015 @@
</span><ins>+##
+# Copyright (c) 2012-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.job.queue}.
+"""
+
+import datetime
+
+from zope.interface.verify import verifyObject
+
+from twisted.trial.unittest import TestCase, SkipTest
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twisted.internet.defer import (
+ Deferred, inlineCallbacks, gatherResults, passthru, returnValue
+)
+from twisted.internet.task import Clock as _Clock
+from twisted.protocols.amp import Command, AMP, Integer
+from twisted.application.service import Service, MultiService
+
+from twext.enterprise.dal.syntax import SchemaSyntax, Select
+from twext.enterprise.dal.record import fromTable
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from twext.enterprise.fixtures import buildConnectionPool
+from twext.enterprise.fixtures import SteppablePoolHelper
+from twext.enterprise.jobqueue import (
+ inTransaction, PeerConnectionPool, astimestamp,
+ LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
+ ConnectionFromPeerNode, LocalQueuer,
+ _BaseQueuer, NonPerformingQueuer
+)
+import twext.enterprise.jobqueue
+
+# TODO: There should be a store-building utility within twext.enterprise.
+try:
+ from txdav.common.datastore.test.util import buildStore
+except ImportError:
+ def buildStore(*args, **kwargs):
+ raise SkipTest(
+ "buildStore is not available, because it's in txdav; duh."
+ )
+
+
+
+class Clock(_Clock):
+ """
+ More careful L{IReactorTime} fake which mimics the exception behavior of
+ the real reactor.
+ """
+
+ def callLater(self, _seconds, _f, *args, **kw):
+ if _seconds < 0:
+ raise ValueError("%s<0: " % (_seconds,))
+ return super(Clock, self).callLater(_seconds, _f, *args, **kw)
+
+
+
+class MemoryReactorWithClock(MemoryReactor, Clock):
+ """
+ Simulate a real reactor.
+ """
+ def __init__(self):
+ MemoryReactor.__init__(self)
+ Clock.__init__(self)
+
+
+
+def transactionally(transactionCreator):
+ """
+ Perform the decorated function immediately in a transaction, replacing its
+ name with a L{Deferred}.
+
+ Use like so::
+
+ @transactionally(connectionPool.connection)
+ @inlineCallbacks
+ def it(txn):
+ yield txn.doSomething()
+ it.addCallback(firedWhenDone)
+
+ @param transactionCreator: A 0-arg callable that returns an
+ L{IAsyncTransaction}.
+ """
+ def thunk(operation):
+ return inTransaction(transactionCreator, operation)
+ return thunk
+
+
+
+class UtilityTests(TestCase):
+ """
+ Tests for supporting utilities.
+ """
+
+ def test_inTransactionSuccess(self):
+ """
+ L{inTransaction} invokes its C{transactionCreator} argument, and then
+ returns a L{Deferred} which fires with the result of its C{operation}
+ argument when it succeeds.
+ """
+ class faketxn(object):
+ def __init__(self):
+ self.commits = []
+ self.aborts = []
+
+ def commit(self):
+ self.commits.append(Deferred())
+ return self.commits[-1]
+
+ def abort(self):
+ self.aborts.append(Deferred())
+ return self.aborts[-1]
+
+ createdTxns = []
+
+ def createTxn():
+ createdTxns.append(faketxn())
+ return createdTxns[-1]
+
+ dfrs = []
+
+ def operation(t):
+ self.assertIdentical(t, createdTxns[-1])
+ dfrs.append(Deferred())
+ return dfrs[-1]
+
+ d = inTransaction(createTxn, operation)
+ x = []
+ d.addCallback(x.append)
+ self.assertEquals(x, [])
+ self.assertEquals(len(dfrs), 1)
+ dfrs[0].callback(35)
+
+ # Commit in progress, so still no result...
+ self.assertEquals(x, [])
+ createdTxns[0].commits[0].callback(42)
+
+ # Committed, everything's done.
+ self.assertEquals(x, [35])
+
+
+
+class SimpleSchemaHelper(SchemaTestHelper):
+ def id(self):
+ return "worker"
+
+
+
+SQL = passthru
+
+nodeSchema = SQL(
+ """
+ create table NODE_INFO (
+ HOSTNAME varchar(255) not null,
+ PID integer not null,
+ PORT integer not null,
+ TIME timestamp default current_timestamp not null,
+ primary key (HOSTNAME, PORT)
+ );
+ """
+)
+
+jobSchema = SQL(
+ """
+ create table JOB (
+ JOB_ID integer primary key default 1,
+ WORK_TYPE varchar(255) not null,
+ PRIORITY integer default 0,
+ WEIGHT integer default 0,
+ NOT_BEFORE timestamp default null,
+ NOT_AFTER timestamp default null
+ );
+ """
+)
+
+schemaText = SQL(
+ """
+ create table DUMMY_WORK_ITEM (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0
+ );
+ create table DUMMY_WORK_DONE (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A_PLUS_B integer
+ );
+ """
+)
+
+try:
+ schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(jobSchema + schemaText))
+
+ dropSQL = [
+ "drop table {name} cascade".format(name=table)
+ for table in ("DUMMY_WORK_ITEM", "DUMMY_WORK_DONE")
+ ] + ["delete from job"]
+except SkipTest as e:
+ DummyWorkDone = DummyWorkItem = object
+ skip = e
+else:
+ DummyWorkDone = fromTable(schema.DUMMY_WORK_DONE)
+ DummyWorkItem = fromTable(schema.DUMMY_WORK_ITEM)
+ skip = False
+
+
+
+class DummyWorkDone(WorkItem, DummyWorkDone):
+ """
+ Work result.
+ """
+
+
+
+class DummyWorkItem(WorkItem, DummyWorkItem):
+ """
+ Sample L{WorkItem} subclass that adds two integers together and stores them
+ in another table.
+ """
+
+ def doWork(self):
+ return DummyWorkDone.makeJob(
+ self.transaction, jobID=self.jobID + 100, workID=self.workID + 100, aPlusB=self.a + self.b
+ )
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadForJob(cls, txn, *a):
+ """
+ Load L{DummyWorkItem} as normal... unless the loaded item has
+ C{DELETE_ON_LOAD} set, in which case, do a deletion of this same row in
+ a concurrent transaction, then commit it.
+ """
+ workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+ if workItems[0].deleteOnLoad:
+ otherTransaction = txn.concurrently()
+ otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+ yield otherSelf[0].delete()
+ yield otherTransaction.commit()
+ returnValue(workItems)
+
+
+
+class AMPTests(TestCase):
+ """
+ Tests for L{AMP} faithfully relaying ids across the wire.
+ """
+
+ def test_sendTableWithName(self):
+ """
+ You can send a reference to a table through a L{SchemaAMP} via
+ L{TableSyntaxByName}.
+ """
+ client = AMP()
+
+ class SampleCommand(Command):
+ arguments = [("id", Integer())]
+
+ class Receiver(AMP):
+ @SampleCommand.responder
+ def gotIt(self, id):
+ self.it = id
+ return {}
+
+ server = Receiver()
+ clientT = StringTransport()
+ serverT = StringTransport()
+ client.makeConnection(clientT)
+ server.makeConnection(serverT)
+ client.callRemote(SampleCommand, id=123)
+ server.dataReceived(clientT.io.getvalue())
+ self.assertEqual(server.it, 123)
+
+
+
+class WorkItemTests(TestCase):
+ """
+ A L{WorkItem} is an item of work that can be executed.
+ """
+
+ def test_forTableName(self):
+ """
+ L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
+ table.
+ """
+ self.assertIdentical(
+ WorkItem.forTableName(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
+ )
+
+
+ @inlineCallbacks
+ def test_enqueue(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice():
+ result = realChoosePerformer()
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return qpool.enqueueWork(
+ txn, DummyWorkItem, a=3, b=9,
+ notBefore=datetime.datetime(2012, 12, 13, 12, 12, 0)
+ )
+
+ proposal = yield check
+ yield proposal.whenProposed()
+
+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ @transactionally(dbpool.connection)
+ def checkJob(txn):
+ return Select(
+ From=schema.JOB
+ ).on(txn)
+
+ jobs = yield checkJob
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0][1] == "DUMMY_WORK_ITEM")
+
+ @transactionally(dbpool.connection)
+ def checkWork(txn):
+ return Select(
+ From=schema.DUMMY_WORK_ITEM
+ ).on(txn)
+
+ work = yield checkWork
+ self.assertTrue(len(work) == 1)
+ self.assertTrue(work[0][1] == jobs[0][0])
+
+
+
+class WorkerConnectionPoolTests(TestCase):
+ """
+ A L{WorkerConnectionPool} is responsible for managing, in a node's
+ controller (master) process, the collection of worker (slave) processes
+ that are capable of executing queue work.
+ """
+
+
+
+class WorkProposalTests(TestCase):
+ """
+ Tests for L{WorkProposal}.
+ """
+
+ @inlineCallbacks
+ def test_whenProposedSuccess(self):
+ """
+ The L{Deferred} returned by L{WorkProposal.whenProposed} fires when the
+ SQL sent to the database has completed.
+ """
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ cph.setUp(test=self)
+ lq = LocalQueuer(cph.createTransaction)
+ enqTxn = cph.createTransaction()
+ wp = yield lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4)
+ r = yield wp.whenProposed()
+ self.assertEquals(r, wp)
+
+
+ def test_whenProposedFailure(self):
+ """
+ The L{Deferred} returned by L{WorkProposal.whenProposed} fails with an
+ errback when the SQL executed to create the WorkItem row fails.
+ """
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ cph.setUp(self)
+ enqTxn = cph.createTransaction()
+ lq = LocalQueuer(cph.createTransaction)
+ self.failUnlessFailure(lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4, bogus=3), TypeError)
+ enqTxn.abort()
+ self.flushLoggedErrors()
+
+
+
+class PeerConnectionPoolUnitTests(TestCase):
+ """
+ L{PeerConnectionPool} has many internal components.
+ """
+ def setUp(self):
+ """
+ Create a L{PeerConnectionPool} that is just initialized enough.
+ """
+ self.pcp = PeerConnectionPool(None, None, 4321)
+
+
+ def checkPerformer(self, cls):
+ """
+ Verify that the performer returned by
+ L{PeerConnectionPool.choosePerformer}.
+ """
+ performer = self.pcp.choosePerformer()
+ self.failUnlessIsInstance(performer, cls)
+ verifyObject(_IJobPerformer, performer)
+
+
+ def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+ have spawned and no peers have established connections (either incoming
+ or outgoing), then it chooses an implementation of C{performJob} that
+ simply executes the work locally.
+ """
+ self.checkPerformer(LocalPerformer)
+
+
+ def test_choosingPerformerWithLocalCapacity(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when some workers
+ have spawned, then it should choose the worker pool as the local
+ performer.
+ """
+ # Give it some local capacity.
+ wlf = self.pcp.workerListenerFactory()
+ proto = wlf.buildProtocol(None)
+ proto.makeConnection(StringTransport())
+ # Sanity check.
+ self.assertEqual(len(self.pcp.workerPool.workers), 1)
+ self.assertEqual(self.pcp.workerPool.hasAvailableCapacity(), True)
+ # Now it has some capacity.
+ self.checkPerformer(WorkerConnectionPool)
+
+
+ def test_choosingPerformerFromNetwork(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+ have spawned but some peers have connected, then it should choose a
+ connection from the network to perform it.
+ """
+ peer = PeerConnectionPool(None, None, 4322)
+ local = self.pcp.peerFactory().buildProtocol(None)
+ remote = peer.peerFactory().buildProtocol(None)
+ connection = Connection(local, remote)
+ connection.start()
+ self.checkPerformer(ConnectionFromPeerNode)
+
+
+ def test_performingWorkOnNetwork(self):
+ """
+ The L{performJob} command will get relayed to the remote peer
+ controller.
+ """
+ peer = PeerConnectionPool(None, None, 4322)
+ local = self.pcp.peerFactory().buildProtocol(None)
+ remote = peer.peerFactory().buildProtocol(None)
+ connection = Connection(local, remote)
+ connection.start()
+ d = Deferred()
+
+ class DummyPerformer(object):
+ def performJob(self, jobID):
+ self.jobID = jobID
+ return d
+
+ # Doing real database I/O in this test would be tedious so fake the
+ # first method in the call stack which actually talks to the DB.
+ dummy = DummyPerformer()
+
+ def chooseDummy(onlyLocally=False):
+ return dummy
+
+ peer.choosePerformer = chooseDummy
+ performed = local.performJob(7384)
+ performResult = []
+ performed.addCallback(performResult.append)
+
+ # Sanity check.
+ self.assertEquals(performResult, [])
+ connection.flush()
+ self.assertEquals(dummy.jobID, 7384)
+ self.assertEquals(performResult, [])
+ d.callback(128374)
+ connection.flush()
+ self.assertEquals(performResult, [None])
+
+
+ def test_choosePerformerSorted(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked make it
+ return the peer with the least load.
+ """
+ peer = PeerConnectionPool(None, None, 4322)
+
+ class DummyPeer(object):
+ def __init__(self, name, load):
+ self.name = name
+ self.load = load
+
+ def currentLoadEstimate(self):
+ return self.load
+
+ apeer = DummyPeer("A", 1)
+ bpeer = DummyPeer("B", 0)
+ cpeer = DummyPeer("C", 2)
+ peer.addPeerConnection(apeer)
+ peer.addPeerConnection(bpeer)
+ peer.addPeerConnection(cpeer)
+
+ performer = peer.choosePerformer(onlyLocally=False)
+ self.assertEqual(performer, bpeer)
+
+ bpeer.load = 2
+ performer = peer.choosePerformer(onlyLocally=False)
+ self.assertEqual(performer, apeer)
+
+
+ @inlineCallbacks
+ def test_notBeforeWhenCheckingForLostWork(self):
+ """
+ L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+ outstanding work items, but only those that are expired.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ # An arbitrary point in time.
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ # *why* does datetime still not have .astimestamp()
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+
+ # Let's create a couple of work items directly, not via the enqueue
+ # method, so that they exist but nobody will try to immediately execute
+ # them.
+
+ @transactionally(dbpool.connection)
+ @inlineCallbacks
+ def setup(txn):
+ # First, one that's right now.
+ yield DummyWorkItem.makeJob(txn, a=1, b=2, notBefore=fakeNow)
+
+ # Next, create one that's actually far enough into the past to run.
+ yield DummyWorkItem.makeJob(
+ txn, a=3, b=4, notBefore=(
+ # Schedule it in the past so that it should have already
+ # run.
+ fakeNow - datetime.timedelta(
+ seconds=qpool.queueProcessTimeout + 20
+ )
+ )
+ )
+
+ # Finally, one that's actually scheduled for the future.
+ yield DummyWorkItem.makeJob(
+ txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
+ )
+ yield setup
+ yield qpool._periodicLostWorkCheck()
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return DummyWorkDone.all(txn)
+
+ every = yield check
+ self.assertEquals([x.aPlusB for x in every], [7])
+
+
+ @inlineCallbacks
+ def test_notBeforeWhenEnqueueing(self):
+ """
+ L{PeerConnectionPool.enqueueWork} enqueues some work immediately, but
+ only executes it when enough time has elapsed to allow the C{notBefore}
+ attribute of the given work item to have passed.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice():
+ result = realChoosePerformer()
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return qpool.enqueueWork(
+ txn, DummyWorkItem, a=3, b=9,
+ notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
+ )
+
+ proposal = yield check
+ yield proposal.whenProposed()
+
+ # This is going to schedule the work to happen with some asynchronous
+ # I/O in the middle; this is a problem because how do we know when it's
+ # time to check to see if the work has started? We need to intercept
+ # the thing that kicks off the work; we can then wait for the work
+ # itself.
+
+ self.assertEquals(performerChosen, [])
+
+ # Advance to exactly the appointed second.
+ clock.advance(20 - 12)
+ self.assertEquals(performerChosen, [True])
+
+ # FIXME: if this fails, it will hang, but that's better than no
+ # notification that it is broken at all.
+
+ result = yield proposal.whenExecuted()
+ self.assertIdentical(result, proposal)
+
+
+ @inlineCallbacks
+ def test_notBeforeBefore(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will execute its work immediately if
+ the C{notBefore} attribute of the work item in question is in the past.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice():
+ result = realChoosePerformer()
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return qpool.enqueueWork(
+ txn, DummyWorkItem, a=3, b=9,
+ notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
+ )
+
+ proposal = yield check
+ yield proposal.whenProposed()
+
+ clock.advance(1000)
+ # Advance far beyond the given timestamp.
+ self.assertEquals(performerChosen, [True])
+
+ result = yield proposal.whenExecuted()
+ self.assertIdentical(result, proposal)
+
+
+ def test_workerConnectionPoolPerformJob(self):
+ """
+ L{WorkerConnectionPool.performJob} performs work by selecting a
+ L{ConnectionFromWorker} and sending it a L{PerformJOB} command.
+ """
+ clock = Clock()
+ peerPool = PeerConnectionPool(clock, None, 4322)
+ factory = peerPool.workerListenerFactory()
+
+ def peer():
+ p = factory.buildProtocol(None)
+ t = StringTransport()
+ p.makeConnection(t)
+ return p, t
+
+ worker1, _ignore_trans1 = peer()
+ worker2, _ignore_trans2 = peer()
+
+ # Ask the worker to do something.
+ worker1.performJob(1)
+ self.assertEquals(worker1.currentLoad, 1)
+ self.assertEquals(worker2.currentLoad, 0)
+
+ # Now ask the pool to do something
+ peerPool.workerPool.performJob(2)
+ self.assertEquals(worker1.currentLoad, 1)
+ self.assertEquals(worker2.currentLoad, 1)
+
+
+ def test_poolStartServiceChecksForWork(self):
+ """
+ L{PeerConnectionPool.startService} kicks off the idle work-check loop.
+ """
+ reactor = MemoryReactorWithClock()
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+ reactor.advance(astimestamp(then))
+ cph.setUp(self)
+ pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+ now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+
+ @transactionally(cph.pool.connection)
+ def createOldWork(txn):
+ one = DummyWorkItem.makeJob(txn, jobID=100, workID=1, a=3, b=4, notBefore=then)
+ two = DummyWorkItem.makeJob(txn, jobID=101, workID=2, a=7, b=9, notBefore=now)
+ return gatherResults([one, two])
+
+ pcp.startService()
+ cph.flushHolders()
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(101, 200, 7)]
+ )
+ cph.rows("delete from DUMMY_WORK_DONE")
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(102, 201, 16)]
+ )
+
+
+
+class HalfConnection(object):
+ def __init__(self, protocol):
+ self.protocol = protocol
+ self.transport = StringTransport()
+
+
+ def start(self):
+ """
+ Hook up the protocol and the transport.
+ """
+ self.protocol.makeConnection(self.transport)
+
+
+ def extract(self):
+ """
+ Extract the data currently present in this protocol's output buffer.
+ """
+ io = self.transport.io
+ value = io.getvalue()
+ io.seek(0)
+ io.truncate()
+ return value
+
+
+ def deliver(self, data):
+ """
+ Deliver the given data to this L{HalfConnection}'s protocol's
+ C{dataReceived} method.
+
+ @return: a boolean indicating whether any data was delivered.
+ @rtype: L{bool}
+ """
+ if data:
+ self.protocol.dataReceived(data)
+ return True
+ return False
+
+
+
+class Connection(object):
+
+ def __init__(self, local, remote):
+ """
+ Connect two protocol instances to each other via string transports.
+ """
+ self.receiver = HalfConnection(local)
+ self.sender = HalfConnection(remote)
+
+
+ def start(self):
+ """
+ Start up the connection.
+ """
+ self.sender.start()
+ self.receiver.start()
+
+
+ def pump(self):
+ """
+ Relay data in one direction between the two connections.
+ """
+ result = self.receiver.deliver(self.sender.extract())
+ self.receiver, self.sender = self.sender, self.receiver
+ return result
+
+
+ def flush(self, turns=10):
+ """
+ Keep relaying data until there's no more.
+ """
+ for _ignore_x in range(turns):
+ if not (self.pump() or self.pump()):
+ return
+
+
+
+class PeerConnectionPoolIntegrationTests(TestCase):
+ """
+ L{PeerConnectionPool} is the service responsible for coordinating
+ eventually-consistent task queuing within a cluster.
+ """
+
+ @inlineCallbacks
+ def setUp(self):
+ """
+ L{PeerConnectionPool} requires access to a database and the reactor.
+ """
+ self.store = yield buildStore(self, None)
+
+ def doit(txn):
+ return txn.execSQL(schemaText)
+
+ yield inTransaction(
+ lambda: self.store.newTransaction("bonus schema"), doit
+ )
+
+ def indirectedTransactionFactory(*a):
+ """
+ Allow tests to replace "self.store.newTransaction" to provide
+ fixtures with extra methods on a test-by-test basis.
+ """
+ return self.store.newTransaction(*a)
+
+ def deschema():
+ @inlineCallbacks
+ def deletestuff(txn):
+ for stmt in dropSQL:
+ yield txn.execSQL(stmt)
+ return inTransaction(
+ lambda *a: self.store.newTransaction(*a), deletestuff
+ )
+ self.addCleanup(deschema)
+
+ from twisted.internet import reactor
+ self.node1 = PeerConnectionPool(
+ reactor, indirectedTransactionFactory, 0)
+ self.node2 = PeerConnectionPool(
+ reactor, indirectedTransactionFactory, 0)
+
+ class FireMeService(Service, object):
+ def __init__(self, d):
+ super(FireMeService, self).__init__()
+ self.d = d
+
+ def startService(self):
+ self.d.callback(None)
+
+ d1 = Deferred()
+ d2 = Deferred()
+ FireMeService(d1).setServiceParent(self.node1)
+ FireMeService(d2).setServiceParent(self.node2)
+ ms = MultiService()
+ self.node1.setServiceParent(ms)
+ self.node2.setServiceParent(ms)
+ ms.startService()
+ self.addCleanup(ms.stopService)
+ yield gatherResults([d1, d2])
+ self.store.queuer = self.node1
+
+
+ def test_currentNodeInfo(self):
+ """
+ There will be two C{NODE_INFO} rows in the database, retrievable as two
+ L{NodeInfo} objects, once both nodes have started up.
+ """
+ @inlineCallbacks
+ def check(txn):
+ self.assertEquals(len((yield self.node1.activeNodes(txn))), 2)
+ self.assertEquals(len((yield self.node2.activeNodes(txn))), 2)
+ return inTransaction(self.store.newTransaction, check)
+
+
+ @inlineCallbacks
+ def test_enqueueHappyPath(self):
+ """
+ When a L{WorkItem} is scheduled for execution via
+ L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
+ by the time the L{Deferred} returned from the resulting
+ L{WorkProposal}'s C{whenExecuted} method has fired.
+ """
+ # TODO: this exact test should run against LocalQueuer as well.
+ def operation(txn):
+ # TODO: how does "enqueue" get associated with the transaction?
+ # This is not the fact with a raw t.w.enterprise transaction.
+ # Should probably do something with components.
+ return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+ notBefore=datetime.datetime.utcnow())
+ result = yield inTransaction(self.store.newTransaction, operation)
+ # Wait for it to be executed. Hopefully this does not time out :-\.
+ yield result.whenExecuted()
+
+ def op2(txn):
+ return Select(
+ [
+ schema.DUMMY_WORK_DONE.WORK_ID,
+ schema.DUMMY_WORK_DONE.JOB_ID,
+ schema.DUMMY_WORK_DONE.A_PLUS_B,
+ ],
+ From=schema.DUMMY_WORK_DONE
+ ).on(txn)
+
+ rows = yield inTransaction(self.store.newTransaction, op2)
+ self.assertEquals(rows, [[101, 200, 7]])
+
+
+ @inlineCallbacks
+ def test_noWorkDoneWhenConcurrentlyDeleted(self):
+ """
+ When a L{WorkItem} is concurrently deleted by another transaction, it
+ should I{not} perform its work.
+ """
+ # Provide access to a method called "concurrently" everything using
+ original = self.store.newTransaction
+
+ def decorate(*a, **k):
+ result = original(*a, **k)
+ result.concurrently = self.store.newTransaction
+ return result
+
+ self.store.newTransaction = decorate
+
+ def operation(txn):
+ return txn.enqueue(
+ DummyWorkItem, a=30, b=40, workID=5678,
+ deleteOnLoad=1,
+ notBefore=datetime.datetime.utcnow()
+ )
+
+ proposal = yield inTransaction(self.store.newTransaction, operation)
+ yield proposal.whenExecuted()
+
+ # Sanity check on the concurrent deletion.
+ def op2(txn):
+ return Select(
+ [schema.DUMMY_WORK_ITEM.WORK_ID],
+ From=schema.DUMMY_WORK_ITEM
+ ).on(txn)
+
+ rows = yield inTransaction(self.store.newTransaction, op2)
+ self.assertEquals(rows, [])
+
+ def op3(txn):
+ return Select(
+ [
+ schema.DUMMY_WORK_DONE.WORK_ID,
+ schema.DUMMY_WORK_DONE.A_PLUS_B,
+ ],
+ From=schema.DUMMY_WORK_DONE
+ ).on(txn)
+
+ rows = yield inTransaction(self.store.newTransaction, op3)
+ self.assertEquals(rows, [])
+
+
+
+class DummyProposal(object):
+
+ def __init__(self, *ignored):
+ pass
+
+
+ def _start(self):
+ pass
+
+
+
+class BaseQueuerTests(TestCase):
+
+ def setUp(self):
+ self.proposal = None
+ self.patch(twext.enterprise.jobqueue, "WorkProposal", DummyProposal)
+
+
+ def _proposalCallback(self, proposal):
+ self.proposal = proposal
+
+
+ @inlineCallbacks
+ def test_proposalCallbacks(self):
+ queuer = _BaseQueuer()
+ queuer.callWithNewProposals(self._proposalCallback)
+ self.assertEqual(self.proposal, None)
+ yield queuer.enqueueWork(None, None)
+ self.assertNotEqual(self.proposal, None)
+
+
+
+class NonPerformingQueuerTests(TestCase):
+
+ @inlineCallbacks
+ def test_choosePerformer(self):
+ queuer = NonPerformingQueuer()
+ performer = queuer.choosePerformer()
+ result = (yield performer.performJob(None))
+ self.assertEquals(result, None)
</ins></span></pre>
</div>
</div>
</body>
</html>