[CalendarServer-changes] [8732] CalendarServer/branches/users/cdaboo/txn-debugging

source_changes at macosforge.org source_changes at macosforge.org
Mon Feb 20 14:24:07 PST 2012


Revision: 8732
          http://trac.macosforge.org/projects/calendarserver/changeset/8732
Author:   cdaboo at apple.com
Date:     2012-02-20 14:24:07 -0800 (Mon, 20 Feb 2012)
Log Message:
-----------
Add a way to log long transactions and to forcibly abort them based on a timeout.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py
    CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py
    CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py
    CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py

Modified: CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py	2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py	2012-02-20 22:24:07 UTC (rev 8732)
@@ -235,6 +235,8 @@
             logLabels=config.LogDatabase.LabelsInSQL,
             logStats=config.LogDatabase.Statistics,
             logSQL=config.LogDatabase.SQLStatements,
+            logTransactionWaits=config.LogDatabase.TransactionWaitSeconds,
+            timeoutTransactions=config.TransactionTimeoutSeconds,
         )
     else:
         return CommonFileDataStore(

Modified: CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py	2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py	2012-02-20 22:24:07 UTC (rev 8732)
@@ -507,7 +507,7 @@
     def __init__(self, pool, baseTxn):
         self._pool           = pool
         self._baseTxn        = baseTxn
-        self._complete       = False
+        self._completed      = False
         self._currentBlock   = None
         self._blockedQueue   = None
         self._pendingBlocks  = []
@@ -626,7 +626,7 @@
         """
         If the transaction is complete, raise L{AlreadyFinishedError}
         """
-        if self._complete:
+        if self._completed:
             raise AlreadyFinishedError()
 
 
@@ -635,7 +635,7 @@
         Mark the transaction as complete, raising AlreadyFinishedError.
         """
         self._checkComplete()
-        self._complete = True
+        self._completed = True
 
 
     def commandBlock(self):

Modified: CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py	2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py	2012-02-20 22:24:07 UTC (rev 8732)
@@ -269,6 +269,10 @@
                            # to slave processes.
 
     "UseDatabase"  : True, # True: database; False: files
+    
+    "TransactionTimeoutSeconds" : 0, # Timeout transactions that take longer than
+                              # the specified number of seconds. Zero means
+                              # no timeouts
 
     "DBType"       : "",   # 2 possible values: empty, meaning 'spawn postgres
                            # yourself', or 'postgres', meaning 'connect to a
@@ -445,9 +449,10 @@
     "GlobalStatsLoggingFrequency" : 12,
     
     "LogDatabase" : {
-        "LabelsInSQL"   : False,
-        "Statistics"    : False,
-        "SQLStatements" : False,
+        "LabelsInSQL"            : False,
+        "Statistics"             : False,
+        "SQLStatements"          : False,
+        "TransactionWaitSeconds" : 0, 
     },
 
     #

Modified: CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py	2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py	2012-02-20 22:24:07 UTC (rev 8732)
@@ -38,6 +38,7 @@
 from twisted.python.util import FancyEqMixin
 from twisted.python.failure import Failure
 
+from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue, succeed
 
 from twisted.application.service import Service
@@ -134,7 +135,8 @@
     def __init__(self, sqlTxnFactory, notifierFactory, attachmentsPath,
                  enableCalendars=True, enableAddressBooks=True,
                  label="unlabeled", quota=(2 ** 20),
-                 logLabels=False, logStats=False, logSQL=False):
+                 logLabels=False, logStats=False, logSQL=False,
+                 logTransactionWaits=0, timeoutTransactions=0):
         assert enableCalendars or enableAddressBooks
 
         self.sqlTxnFactory = sqlTxnFactory
@@ -147,6 +149,8 @@
         self.logLabels = logLabels
         self.logStats = logStats
         self.logSQL = logSQL
+        self.logTransactionWaits = logTransactionWaits
+        self.timeoutTransactions = timeoutTransactions
         self._migrating = False
         self._enableNotifications = True
 
@@ -170,7 +174,7 @@
         """
         @see L{IDataStore.newTransaction}
         """
-        return CommonStoreTransaction(
+        txn = CommonStoreTransaction(
             self,
             self.sqlTxnFactory(),
             self.enableCalendars,
@@ -179,6 +183,10 @@
             label,
             self._migrating,
         )
+        
+        if self.logTransactionWaits or self.timeoutTransactions:
+            CommonStoreTransactionMonitor(txn, self.logTransactionWaits, self.timeoutTransactions)
+        return txn
 
     def setMigrating(self, state):
         """
@@ -229,6 +237,56 @@
             toFile.write("\n")
         toFile.write("***\n")
 
+class CommonStoreTransactionMonitor(object):
+    """
+    Object that monitors the state of a transaction over time and logs or times out
+    the transaction.
+    """
+    
+    def __init__(self, txn, logTimerSeconds, timeoutSeconds):
+        self.txn = txn
+        self.delayedLog = None
+        self.delayedTimeout = None
+        self.logTimerSeconds = logTimerSeconds
+        self.timeoutSeconds = timeoutSeconds
+        
+        self.txn.postCommit(self._cleanTxn)
+        self.txn.postAbort(self._cleanTxn)
+        
+        self._installLogTimer()
+        self._installTimeout()
+    
+    def _cleanTxn(self):
+        self.txn = None
+        if self.delayedLog:
+            self.delayedLog.cancel()
+            self.delayedLog = None
+        if self.delayedTimeout:
+            self.delayedTimeout.cancel()
+            self.delayedTimeout = None
+
+    def _installLogTimer(self):
+        def _logTransactionWait():
+            if self.txn is not None:
+                log.error("Transaction wait: %r, IUDs: %d, Statement: %s" % (self.txn, self.txn.iudCount, self.txn.currentStatement if self.txn.currentStatement else "None",))
+                self.delayedLog = reactor.callLater(self.logTimerSeconds, _logTransactionWait)
+
+        if self.logTimerSeconds:
+            self.delayedLog = reactor.callLater(self.logTimerSeconds, _logTransactionWait)
+    
+    def _installTimeout(self):
+        def _forceAbort():
+            if self.txn is not None:
+                log.error("Transaction abort too long: %r, IUDs: %d, Statement: %s" % (self.txn, self.txn.iudCount, self.txn.currentStatement if self.txn.currentStatement else "None",))
+                self.delayedTimeout = None
+                if self.delayedLog:
+                    self.delayedLog.cancel()
+                    self.delayedLog = None
+                self.txn.abort()
+
+        if self.timeoutSeconds:
+            self.delayedTimeout = reactor.callLater(self.timeoutSeconds, _forceAbort)
+
 class CommonStoreTransaction(object):
     """
     Transaction implementation for SQL database.
@@ -273,8 +331,9 @@
         self.paramstyle = sqlTxn.paramstyle
         self.dialect = sqlTxn.dialect
         
-        # FIXME: want to pass a "debug" option in to enable this via config - off for now
         self._stats = TransactionStatsCollector() if self._store.logStats else None
+        self.iudCount = 0
+        self.currentStatement = None
 
 
     def store(self):
@@ -553,13 +612,19 @@
         """
         if self._stats:        
             self._stats.startStatement(a[0])
+        self.currentStatement = a[0]
+        if self._store.logTransactionWaits and a[0].split(" ", 1)[0].lower() in ("insert", "update", "delete",):
+            self.iudCount += 1
         if self._store.logLabels:
             a = ("-- Label: %s\n" % (self._label.replace("%", "%%"),) + a[0],) + a[1:]
         if self._store.logSQL:
             log.error("SQL: %r %r" % (a, kw,))
-        results = (yield self._sqlTxn.execSQL(*a, **kw))
-        if self._stats:        
-            self._stats.endStatement()
+        try:
+            results = (yield self._sqlTxn.execSQL(*a, **kw))
+        finally:
+            self.currentStatement = None
+            if self._stats:        
+                self._stats.endStatement()
         returnValue(results)
 
     @inlineCallbacks

Modified: CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py	2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py	2012-02-20 22:24:07 UTC (rev 8732)
@@ -19,8 +19,10 @@
 """
 
 from twext.enterprise.dal.syntax import Select
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, Deferred
 from twisted.trial.unittest import TestCase
+from txdav.common.datastore.sql import log
 from txdav.common.datastore.sql_tables import schema
 from txdav.common.datastore.test.util import CommonCommonTests, buildStore
 from txdav.common.icommondatastore import AllRetriesFailed
@@ -69,6 +71,92 @@
         self.assertEqual(len(version), 1)
         self.assertEqual(len(version[0]), 1)
 
+    def test_logWaits(self):
+        """
+        CommonStoreTransactionMonitor logs waiting transactions.
+        """
+        
+        # Patch config to turn on log waits then rebuild the store
+        self.patch(self._sqlStore, "logTransactionWaits", 1)
+        
+        ctr = [0]
+        def counter(_ignore):
+            ctr[0] += 1
+        self.patch(log, "error", counter)
+
+        txn = self.transactionUnderTest()
+        
+        def doTest(result):
+            self.assertNotEqual(ctr[0], 0)
+            txn.abort()
+        d = Deferred()
+        d.addCallback(doTest)
+
+        def checkIt():
+            d.callback(None)
+        reactor.callLater(2, checkIt)
+
+        return d
+
+    def test_txnTimeout(self):
+        """
+        CommonStoreTransactionMonitor terminates long transactions.
+        """
+        
+        # Patch config to turn on transaction timeouts then rebuild the store
+        self.patch(self._sqlStore, "timeoutTransactions", 1)
+        
+        ctr = [0]
+        def counter(_ignore):
+            ctr[0] += 1
+        self.patch(log, "error", counter)
+
+        txn = self.transactionUnderTest()
+        
+        def doTest(result):
+            self.assertNotEqual(ctr[0], 0)
+            self.assertTrue(txn._sqlTxn._completed)
+        d = Deferred()
+        d.addCallback(doTest)
+
+        def checkIt():
+            d.callback(None)
+        reactor.callLater(2, checkIt)
+
+        return d
+
+    def test_logWaitsAndTxnTimeout(self):
+        """
+        CommonStoreTransactionMonitor logs waiting transactions and terminates long transactions.
+        """
+        
+        # Patch config to turn on log waits then rebuild the store
+        self.patch(self._sqlStore, "logTransactionWaits", 1)
+        self.patch(self._sqlStore, "timeoutTransactions", 2)
+        
+        ctr = [0, 0]
+        def counter(logStr):
+            if "wait" in logStr:
+                ctr[0] += 1
+            elif "abort" in logStr:
+                ctr[1] += 1
+        self.patch(log, "error", counter)
+
+        txn = self.transactionUnderTest()
+        
+        def doTest(result):
+            self.assertNotEqual(ctr[0], 0)
+            self.assertNotEqual(ctr[1], 0)
+            self.assertTrue(txn._sqlTxn._completed)
+        d = Deferred()
+        d.addCallback(doTest)
+
+        def checkIt():
+            d.callback(None)
+        reactor.callLater(3, checkIt)
+
+        return d
+
     @inlineCallbacks
     def test_subtransactionOK(self):
         """
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120220/2e87e52b/attachment-0001.html>


More information about the calendarserver-changes mailing list