[CalendarServer-changes] [15253] twext/trunk/twext

source_changes at macosforge.org source_changes at macosforge.org
Fri Oct 30 07:40:20 PDT 2015


Revision: 15253
          http://trac.calendarserver.org//changeset/15253
Author:   cdaboo at apple.com
Date:     2015-10-30 07:40:20 -0700 (Fri, 30 Oct 2015)
Log Message:
-----------
M<erge thread deadlock fix from PyPy branch to trunk.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/adbapi2.py
    twext/trunk/twext/enterprise/fixtures.py
    twext/trunk/twext/enterprise/jobs/test/test_jobs.py
    twext/trunk/twext/internet/threadutils.py

Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py	2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/enterprise/adbapi2.py	2015-10-30 14:40:20 UTC (rev 15253)
@@ -751,6 +751,7 @@
         """
         spooledBase = self._baseTxn
         self._baseTxn = baseTxn
+        self._baseTxn._label = self._label
         spooledBase._unspool(baseTxn)
 
 
@@ -1046,6 +1047,7 @@
         def _reallyClose():
             if self._connection:
                 self._connection.close()
+                self._connection = None
 
         self._holder.submit(_reallyClose)
 

Modified: twext/trunk/twext/enterprise/fixtures.py
===================================================================
--- twext/trunk/twext/enterprise/fixtures.py	2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/enterprise/fixtures.py	2015-10-30 14:40:20 UTC (rev 15253)
@@ -27,10 +27,10 @@
 from zope.interface.verify import verifyClass
 
 from twisted.internet.interfaces import IReactorThreads
+from twisted.internet.defer import Deferred
+from twisted.internet.task import Clock
 from twisted.python.threadpool import ThreadPool
 
-from twisted.internet.task import Clock
-
 from twext.enterprise.adbapi2 import ConnectionPool
 from twext.enterprise.ienterprise import SQLITE_DIALECT
 from twext.enterprise.ienterprise import POSTGRES_DIALECT
@@ -118,6 +118,10 @@
         return super(FakeThreadHolder, self).start()
 
 
+    def retry(self):
+        pass
+
+
     def stop(self):
         result = super(FakeThreadHolder, self).stop()
         self.stopped = True
@@ -147,6 +151,10 @@
         self._q_ = newq
 
 
+    def callLater(self, f, *a, **k):
+        return Deferred()
+
+
     def callFromThread(self, f, *a, **k):
         result = f(*a, **k)
         return result

Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py	2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py	2015-10-30 14:40:20 UTC (rev 15253)
@@ -1655,7 +1655,8 @@
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
+                DummyWorkPauseItem, a=30, b=40, workID=1,
+                notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
             )
         yield _enqueue
 
@@ -1740,7 +1741,8 @@
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
+                DummyWorkPauseItem, a=30, b=40, workID=1,
+                notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
             )
         yield _enqueue
 
@@ -1810,7 +1812,8 @@
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
+                DummyWorkPauseItem, a=30, b=40, workID=1,
+                notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
             )
         yield _enqueue
 

Modified: twext/trunk/twext/internet/threadutils.py
===================================================================
--- twext/trunk/twext/internet/threadutils.py	2015-10-29 19:04:13 UTC (rev 15252)
+++ twext/trunk/twext/internet/threadutils.py	2015-10-30 14:40:20 UTC (rev 15253)
@@ -24,9 +24,10 @@
 
 _DONE = object()
 
-_STATE_STOPPED = 'STOPPED'
+_STATE_STARTING = 'STARTING'
 _STATE_RUNNING = 'RUNNING'
 _STATE_STOPPING = 'STOPPING'
+_STATE_STOPPED = 'STOPPED'
 
 class ThreadHolder(object):
     """
@@ -39,12 +40,14 @@
         self._state = _STATE_STOPPED
         self._stopper = None
         self._q = None
+        self._retryCallback = None
 
 
     def _run(self):
         """
         Worker function which runs in a non-reactor thread.
         """
+        self._state = _STATE_RUNNING
         while self._qpull():
             pass
 
@@ -90,7 +93,7 @@
 
         @return: L{Deferred} that fires with the result of L{work}
         """
-        if self._state != _STATE_RUNNING:
+        if self._state not in (_STATE_RUNNING, _STATE_STARTING):
             raise RuntimeError("not running")
         d = Deferred()
         self._q.put((d, work))
@@ -103,18 +106,31 @@
         """
         if self._state != _STATE_STOPPED:
             raise RuntimeError("Not stopped.")
-        self._state = _STATE_RUNNING
+        self._state = _STATE_STARTING
         self._q = Queue(0)
         self._reactor.callInThread(self._run)
+        self.retry()
 
 
+    def retry(self):
+        if self._state == _STATE_STARTING:
+            if self._retryCallback is not None:
+                self._reactor.threadpool._startSomeWorkers()
+            self._retryCallback = self._reactor.callLater(0.1, self.retry)
+        else:
+            self._retryCallback = None
+
+
     def stop(self):
         """
         Stop this thing and release its thread, if it's running.
         """
-        if self._state != _STATE_RUNNING:
+        if self._state not in (_STATE_RUNNING, _STATE_STARTING):
             raise RuntimeError("Not running.")
         s = self._stopper = Deferred()
         self._state = _STATE_STOPPING
         self._q.put(_DONE)
+        if self._retryCallback:
+            self._retryCallback.cancel()
+            self._retryCallback = None
         return s
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20151030/5d4e5771/attachment.html>


More information about the calendarserver-changes mailing list