[CalendarServer-changes] [15253] twext/trunk/twext
source_changes at macosforge.org
source_changes at macosforge.org
Fri Oct 30 07:40:20 PDT 2015
Revision: 15253
http://trac.calendarserver.org//changeset/15253
Author: cdaboo at apple.com
Date: 2015-10-30 07:40:20 -0700 (Fri, 30 Oct 2015)
Log Message:
-----------
M<erge thread deadlock fix from PyPy branch to trunk.
Modified Paths:
--------------
twext/trunk/twext/enterprise/adbapi2.py
twext/trunk/twext/enterprise/fixtures.py
twext/trunk/twext/enterprise/jobs/test/test_jobs.py
twext/trunk/twext/internet/threadutils.py
Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py 2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/enterprise/adbapi2.py 2015-10-30 14:40:20 UTC (rev 15253)
@@ -751,6 +751,7 @@
"""
spooledBase = self._baseTxn
self._baseTxn = baseTxn
+ self._baseTxn._label = self._label
spooledBase._unspool(baseTxn)
@@ -1046,6 +1047,7 @@
def _reallyClose():
if self._connection:
self._connection.close()
+ self._connection = None
self._holder.submit(_reallyClose)
Modified: twext/trunk/twext/enterprise/fixtures.py
===================================================================
--- twext/trunk/twext/enterprise/fixtures.py 2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/enterprise/fixtures.py 2015-10-30 14:40:20 UTC (rev 15253)
@@ -27,10 +27,10 @@
from zope.interface.verify import verifyClass
from twisted.internet.interfaces import IReactorThreads
+from twisted.internet.defer import Deferred
+from twisted.internet.task import Clock
from twisted.python.threadpool import ThreadPool
-from twisted.internet.task import Clock
-
from twext.enterprise.adbapi2 import ConnectionPool
from twext.enterprise.ienterprise import SQLITE_DIALECT
from twext.enterprise.ienterprise import POSTGRES_DIALECT
@@ -118,6 +118,10 @@
return super(FakeThreadHolder, self).start()
+ def retry(self):
+ pass
+
+
def stop(self):
result = super(FakeThreadHolder, self).stop()
self.stopped = True
@@ -147,6 +151,10 @@
self._q_ = newq
+ def callLater(self, f, *a, **k):
+ return Deferred()
+
+
def callFromThread(self, f, *a, **k):
result = f(*a, **k)
return result
Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py 2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py 2015-10-30 14:40:20 UTC (rev 15253)
@@ -1655,7 +1655,8 @@
@transactionally(self.store.newTransaction)
def _enqueue(txn):
return txn.enqueue(
- DummyWorkPauseItem, a=30, b=40, workID=1
+ DummyWorkPauseItem, a=30, b=40, workID=1,
+ notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
)
yield _enqueue
@@ -1740,7 +1741,8 @@
@transactionally(self.store.newTransaction)
def _enqueue(txn):
return txn.enqueue(
- DummyWorkPauseItem, a=30, b=40, workID=1
+ DummyWorkPauseItem, a=30, b=40, workID=1,
+ notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
)
yield _enqueue
@@ -1810,7 +1812,8 @@
@transactionally(self.store.newTransaction)
def _enqueue(txn):
return txn.enqueue(
- DummyWorkPauseItem, a=30, b=40, workID=1
+ DummyWorkPauseItem, a=30, b=40, workID=1,
+ notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
)
yield _enqueue
Modified: twext/trunk/twext/internet/threadutils.py
===================================================================
--- twext/trunk/twext/internet/threadutils.py 2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/internet/threadutils.py 2015-10-30 14:40:20 UTC (rev 15253)
@@ -24,9 +24,10 @@
_DONE = object()
-_STATE_STOPPED = 'STOPPED'
+_STATE_STARTING = 'STARTING'
_STATE_RUNNING = 'RUNNING'
_STATE_STOPPING = 'STOPPING'
+_STATE_STOPPED = 'STOPPED'
class ThreadHolder(object):
"""
@@ -39,12 +40,14 @@
self._state = _STATE_STOPPED
self._stopper = None
self._q = None
+ self._retryCallback = None
def _run(self):
"""
Worker function which runs in a non-reactor thread.
"""
+ self._state = _STATE_RUNNING
while self._qpull():
pass
@@ -90,7 +93,7 @@
@return: L{Deferred} that fires with the result of L{work}
"""
- if self._state != _STATE_RUNNING:
+ if self._state not in (_STATE_RUNNING, _STATE_STARTING):
raise RuntimeError("not running")
d = Deferred()
self._q.put((d, work))
@@ -103,18 +106,31 @@
"""
if self._state != _STATE_STOPPED:
raise RuntimeError("Not stopped.")
- self._state = _STATE_RUNNING
+ self._state = _STATE_STARTING
self._q = Queue(0)
self._reactor.callInThread(self._run)
+ self.retry()
+ def retry(self):
+ if self._state == _STATE_STARTING:
+ if self._retryCallback is not None:
+ self._reactor.threadpool._startSomeWorkers()
+ self._retryCallback = self._reactor.callLater(0.1, self.retry)
+ else:
+ self._retryCallback = None
+
+
def stop(self):
"""
Stop this thing and release its thread, if it's running.
"""
- if self._state != _STATE_RUNNING:
+ if self._state not in (_STATE_RUNNING, _STATE_STARTING):
raise RuntimeError("Not running.")
s = self._stopper = Deferred()
self._state = _STATE_STOPPING
self._q.put(_DONE)
+ if self._retryCallback:
+ self._retryCallback.cancel()
+ self._retryCallback = None
return s
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20151030/5d4e5771/attachment.html>
More information about the calendarserver-changes
mailing list