[CalendarServer-changes] [15023] twext/branches/users/cdaboo/cfod/twext

source_changes at macosforge.org source_changes at macosforge.org
Mon Aug 3 09:39:27 PDT 2015


Revision: 15023
          http://trac.calendarserver.org//changeset/15023
Author:   cdaboo at apple.com
Date:     2015-08-03 09:39:27 -0700 (Mon, 03 Aug 2015)
Log Message:
-----------
Fix PyPy thread dead lock issue. Other PyPy test fixes.

Modified Paths:
--------------
    twext/branches/users/cdaboo/cfod/twext/enterprise/adbapi2.py
    twext/branches/users/cdaboo/cfod/twext/enterprise/fixtures.py
    twext/branches/users/cdaboo/cfod/twext/enterprise/test/test_jobqueue.py
    twext/branches/users/cdaboo/cfod/twext/internet/threadutils.py

Property Changed:
----------------
    twext/branches/users/cdaboo/cfod/twext/platform/osx/

Modified: twext/branches/users/cdaboo/cfod/twext/enterprise/adbapi2.py
===================================================================
--- twext/branches/users/cdaboo/cfod/twext/enterprise/adbapi2.py	2015-08-01 14:52:13 UTC (rev 15022)
+++ twext/branches/users/cdaboo/cfod/twext/enterprise/adbapi2.py	2015-08-03 16:39:27 UTC (rev 15023)
@@ -646,6 +646,7 @@
         """
         spooledBase = self._baseTxn
         self._baseTxn = baseTxn
+        self._baseTxn._label = self._label
         spooledBase._unspool(baseTxn)
 
 
@@ -941,6 +942,7 @@
         def _reallyClose():
             if self._connection:
                 self._connection.close()
+                self._connection = None
 
         self._holder.submit(_reallyClose)
 
@@ -1161,7 +1163,6 @@
 
         def finishInit((connection, cursor)):
             if txn._aborted:
-                connection.close()
                 return
             baseTxn = _ConnectedTxn(
                 pool=self,

Modified: twext/branches/users/cdaboo/cfod/twext/enterprise/fixtures.py
===================================================================
--- twext/branches/users/cdaboo/cfod/twext/enterprise/fixtures.py	2015-08-01 14:52:13 UTC (rev 15022)
+++ twext/branches/users/cdaboo/cfod/twext/enterprise/fixtures.py	2015-08-03 16:39:27 UTC (rev 15023)
@@ -27,10 +27,10 @@
 from zope.interface.verify import verifyClass
 
 from twisted.internet.interfaces import IReactorThreads
+from twisted.internet.defer import Deferred
+from twisted.internet.task import Clock
 from twisted.python.threadpool import ThreadPool
 
-from twisted.internet.task import Clock
-
 from twext.enterprise.adbapi2 import ConnectionPool
 from twext.enterprise.ienterprise import SQLITE_DIALECT
 from twext.enterprise.ienterprise import POSTGRES_DIALECT
@@ -118,6 +118,10 @@
         return super(FakeThreadHolder, self).start()
 
 
+    def retry(self):
+        pass
+
+
     def stop(self):
         result = super(FakeThreadHolder, self).stop()
         self.stopped = True
@@ -147,6 +151,10 @@
         self._q_ = newq
 
 
+    def callLater(self, f, *a, **k):
+        return Deferred()
+
+
     def callFromThread(self, f, *a, **k):
         result = f(*a, **k)
         return result

Modified: twext/branches/users/cdaboo/cfod/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/branches/users/cdaboo/cfod/twext/enterprise/test/test_jobqueue.py	2015-08-01 14:52:13 UTC (rev 15022)
+++ twext/branches/users/cdaboo/cfod/twext/enterprise/test/test_jobqueue.py	2015-08-03 16:39:27 UTC (rev 15023)
@@ -1580,7 +1580,8 @@
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
+                DummyWorkPauseItem, a=30, b=40, workID=1,
+                notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
             )
         yield _enqueue
 
@@ -1663,7 +1664,8 @@
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
+                DummyWorkPauseItem, a=30, b=40, workID=1,
+                notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
             )
         yield _enqueue
 
@@ -1731,7 +1733,8 @@
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
+                DummyWorkPauseItem, a=30, b=40, workID=1,
+                notBefore=datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
             )
         yield _enqueue
 

Modified: twext/branches/users/cdaboo/cfod/twext/internet/threadutils.py
===================================================================
--- twext/branches/users/cdaboo/cfod/twext/internet/threadutils.py	2015-08-01 14:52:13 UTC (rev 15022)
+++ twext/branches/users/cdaboo/cfod/twext/internet/threadutils.py	2015-08-03 16:39:27 UTC (rev 15023)
@@ -24,9 +24,10 @@
 
 _DONE = object()
 
-_STATE_STOPPED = 'STOPPED'
+_STATE_STARTING = 'STARTING'
 _STATE_RUNNING = 'RUNNING'
 _STATE_STOPPING = 'STOPPING'
+_STATE_STOPPED = 'STOPPED'
 
 class ThreadHolder(object):
     """
@@ -39,12 +40,14 @@
         self._state = _STATE_STOPPED
         self._stopper = None
         self._q = None
+        self._retryCallback = None
 
 
     def _run(self):
         """
         Worker function which runs in a non-reactor thread.
         """
+        self._state = _STATE_RUNNING
         while self._qpull():
             pass
 
@@ -90,7 +93,7 @@
 
         @return: L{Deferred} that fires with the result of L{work}
         """
-        if self._state != _STATE_RUNNING:
+        if self._state not in (_STATE_RUNNING, _STATE_STARTING):
             raise RuntimeError("not running")
         d = Deferred()
         self._q.put((d, work))
@@ -103,18 +106,31 @@
         """
         if self._state != _STATE_STOPPED:
             raise RuntimeError("Not stopped.")
-        self._state = _STATE_RUNNING
+        self._state = _STATE_STARTING
         self._q = Queue(0)
         self._reactor.callInThread(self._run)
+        self.retry()
 
 
+    def retry(self):
+        if self._state == _STATE_STARTING:
+            if self._retryCallback is not None:
+                self._reactor.threadpool._startSomeWorkers()
+            self._retryCallback = self._reactor.callLater(0.1, self.retry)
+        else:
+            self._retryCallback = None
+
+
     def stop(self):
         """
         Stop this thing and release its thread, if it's running.
         """
-        if self._state != _STATE_RUNNING:
+        if self._state not in (_STATE_RUNNING, _STATE_STARTING):
             raise RuntimeError("Not running.")
         s = self._stopper = Deferred()
         self._state = _STATE_STOPPING
         self._q.put(_DONE)
+        if self._retryCallback:
+            self._retryCallback.cancel()
+            self._retryCallback = None
         return s


Property changes on: twext/branches/users/cdaboo/cfod/twext/platform/osx
___________________________________________________________________
Added: svn:ignore
   + _corefoundation.c

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150803/f20e4001/attachment.html>


More information about the calendarserver-changes mailing list