[CalendarServer-changes] [8732] CalendarServer/branches/users/cdaboo/txn-debugging
source_changes at macosforge.org
source_changes at macosforge.org
Mon Feb 20 14:24:07 PST 2012
Revision: 8732
http://trac.macosforge.org/projects/calendarserver/changeset/8732
Author: cdaboo at apple.com
Date: 2012-02-20 14:24:07 -0800 (Mon, 20 Feb 2012)
Log Message:
-----------
Add a way to log long transactions and to forcibly abort them based on a timeout.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py
CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py
CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py
CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py
CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py
Modified: CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py 2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/calendarserver/tap/util.py 2012-02-20 22:24:07 UTC (rev 8732)
@@ -235,6 +235,8 @@
logLabels=config.LogDatabase.LabelsInSQL,
logStats=config.LogDatabase.Statistics,
logSQL=config.LogDatabase.SQLStatements,
+ logTransactionWaits=config.LogDatabase.TransactionWaitSeconds,
+ timeoutTransactions=config.TransactionTimeoutSeconds,
)
else:
return CommonFileDataStore(
Modified: CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py 2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/twext/enterprise/adbapi2.py 2012-02-20 22:24:07 UTC (rev 8732)
@@ -507,7 +507,7 @@
def __init__(self, pool, baseTxn):
self._pool = pool
self._baseTxn = baseTxn
- self._complete = False
+ self._completed = False
self._currentBlock = None
self._blockedQueue = None
self._pendingBlocks = []
@@ -626,7 +626,7 @@
"""
If the transaction is complete, raise L{AlreadyFinishedError}
"""
- if self._complete:
+ if self._completed:
raise AlreadyFinishedError()
@@ -635,7 +635,7 @@
Mark the transaction as complete, raising AlreadyFinishedError.
"""
self._checkComplete()
- self._complete = True
+ self._completed = True
def commandBlock(self):
Modified: CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py 2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/twistedcaldav/stdconfig.py 2012-02-20 22:24:07 UTC (rev 8732)
@@ -269,6 +269,10 @@
# to slave processes.
"UseDatabase" : True, # True: database; False: files
+
+ "TransactionTimeoutSeconds" : 0, # Timeout transactions that take longer than
+ # the specified number of seconds. Zero means
+ # no timeouts
"DBType" : "", # 2 possible values: empty, meaning 'spawn postgres
# yourself', or 'postgres', meaning 'connect to a
@@ -445,9 +449,10 @@
"GlobalStatsLoggingFrequency" : 12,
"LogDatabase" : {
- "LabelsInSQL" : False,
- "Statistics" : False,
- "SQLStatements" : False,
+ "LabelsInSQL" : False,
+ "Statistics" : False,
+ "SQLStatements" : False,
+ "TransactionWaitSeconds" : 0,
},
#
Modified: CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py 2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/sql.py 2012-02-20 22:24:07 UTC (rev 8732)
@@ -38,6 +38,7 @@
from twisted.python.util import FancyEqMixin
from twisted.python.failure import Failure
+from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from twisted.application.service import Service
@@ -134,7 +135,8 @@
def __init__(self, sqlTxnFactory, notifierFactory, attachmentsPath,
enableCalendars=True, enableAddressBooks=True,
label="unlabeled", quota=(2 ** 20),
- logLabels=False, logStats=False, logSQL=False):
+ logLabels=False, logStats=False, logSQL=False,
+ logTransactionWaits=0, timeoutTransactions=0):
assert enableCalendars or enableAddressBooks
self.sqlTxnFactory = sqlTxnFactory
@@ -147,6 +149,8 @@
self.logLabels = logLabels
self.logStats = logStats
self.logSQL = logSQL
+ self.logTransactionWaits = logTransactionWaits
+ self.timeoutTransactions = timeoutTransactions
self._migrating = False
self._enableNotifications = True
@@ -170,7 +174,7 @@
"""
@see L{IDataStore.newTransaction}
"""
- return CommonStoreTransaction(
+ txn = CommonStoreTransaction(
self,
self.sqlTxnFactory(),
self.enableCalendars,
@@ -179,6 +183,10 @@
label,
self._migrating,
)
+
+ if self.logTransactionWaits or self.timeoutTransactions:
+ CommonStoreTransactionMonitor(txn, self.logTransactionWaits, self.timeoutTransactions)
+ return txn
def setMigrating(self, state):
"""
@@ -229,6 +237,56 @@
toFile.write("\n")
toFile.write("***\n")
+class CommonStoreTransactionMonitor(object):
+ """
+ Object that monitors the state of a transaction over time and logs or times out
+ the transaction.
+ """
+
+ def __init__(self, txn, logTimerSeconds, timeoutSeconds):
+ self.txn = txn
+ self.delayedLog = None
+ self.delayedTimeout = None
+ self.logTimerSeconds = logTimerSeconds
+ self.timeoutSeconds = timeoutSeconds
+
+ self.txn.postCommit(self._cleanTxn)
+ self.txn.postAbort(self._cleanTxn)
+
+ self._installLogTimer()
+ self._installTimeout()
+
+ def _cleanTxn(self):
+ self.txn = None
+ if self.delayedLog:
+ self.delayedLog.cancel()
+ self.delayedLog = None
+ if self.delayedTimeout:
+ self.delayedTimeout.cancel()
+ self.delayedTimeout = None
+
+ def _installLogTimer(self):
+ def _logTransactionWait():
+ if self.txn is not None:
+ log.error("Transaction wait: %r, IUDs: %d, Statement: %s" % (self.txn, self.txn.iudCount, self.txn.currentStatement if self.txn.currentStatement else "None",))
+ self.delayedLog = reactor.callLater(self.logTimerSeconds, _logTransactionWait)
+
+ if self.logTimerSeconds:
+ self.delayedLog = reactor.callLater(self.logTimerSeconds, _logTransactionWait)
+
+ def _installTimeout(self):
+ def _forceAbort():
+ if self.txn is not None:
+ log.error("Transaction abort too long: %r, IUDs: %d, Statement: %s" % (self.txn, self.txn.iudCount, self.txn.currentStatement if self.txn.currentStatement else "None",))
+ self.delayedTimeout = None
+ if self.delayedLog:
+ self.delayedLog.cancel()
+ self.delayedLog = None
+ self.txn.abort()
+
+ if self.timeoutSeconds:
+ self.delayedTimeout = reactor.callLater(self.timeoutSeconds, _forceAbort)
+
class CommonStoreTransaction(object):
"""
Transaction implementation for SQL database.
@@ -273,8 +331,9 @@
self.paramstyle = sqlTxn.paramstyle
self.dialect = sqlTxn.dialect
- # FIXME: want to pass a "debug" option in to enable this via config - off for now
self._stats = TransactionStatsCollector() if self._store.logStats else None
+ self.iudCount = 0
+ self.currentStatement = None
def store(self):
@@ -553,13 +612,19 @@
"""
if self._stats:
self._stats.startStatement(a[0])
+ self.currentStatement = a[0]
+ if self._store.logTransactionWaits and a[0].split(" ", 1)[0].lower() in ("insert", "update", "delete",):
+ self.iudCount += 1
if self._store.logLabels:
a = ("-- Label: %s\n" % (self._label.replace("%", "%%"),) + a[0],) + a[1:]
if self._store.logSQL:
log.error("SQL: %r %r" % (a, kw,))
- results = (yield self._sqlTxn.execSQL(*a, **kw))
- if self._stats:
- self._stats.endStatement()
+ try:
+ results = (yield self._sqlTxn.execSQL(*a, **kw))
+ finally:
+ self.currentStatement = None
+ if self._stats:
+ self._stats.endStatement()
returnValue(results)
@inlineCallbacks
Modified: CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py 2012-02-20 22:22:10 UTC (rev 8731)
+++ CalendarServer/branches/users/cdaboo/txn-debugging/txdav/common/datastore/test/test_sql.py 2012-02-20 22:24:07 UTC (rev 8732)
@@ -19,8 +19,10 @@
"""
from twext.enterprise.dal.syntax import Select
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.trial.unittest import TestCase
+from txdav.common.datastore.sql import log
from txdav.common.datastore.sql_tables import schema
from txdav.common.datastore.test.util import CommonCommonTests, buildStore
from txdav.common.icommondatastore import AllRetriesFailed
@@ -69,6 +71,92 @@
self.assertEqual(len(version), 1)
self.assertEqual(len(version[0]), 1)
+ def test_logWaits(self):
+ """
+ CommonStoreTransactionMonitor logs waiting transactions.
+ """
+
+ # Patch config to turn on log waits then rebuild the store
+ self.patch(self._sqlStore, "logTransactionWaits", 1)
+
+ ctr = [0]
+ def counter(_ignore):
+ ctr[0] += 1
+ self.patch(log, "error", counter)
+
+ txn = self.transactionUnderTest()
+
+ def doTest(result):
+ self.assertNotEqual(ctr[0], 0)
+ txn.abort()
+ d = Deferred()
+ d.addCallback(doTest)
+
+ def checkIt():
+ d.callback(None)
+ reactor.callLater(2, checkIt)
+
+ return d
+
+ def test_txnTimeout(self):
+ """
+ CommonStoreTransactionMonitor terminates long transactions.
+ """
+
+ # Patch config to turn on transaction timeouts then rebuild the store
+ self.patch(self._sqlStore, "timeoutTransactions", 1)
+
+ ctr = [0]
+ def counter(_ignore):
+ ctr[0] += 1
+ self.patch(log, "error", counter)
+
+ txn = self.transactionUnderTest()
+
+ def doTest(result):
+ self.assertNotEqual(ctr[0], 0)
+ self.assertTrue(txn._sqlTxn._completed)
+ d = Deferred()
+ d.addCallback(doTest)
+
+ def checkIt():
+ d.callback(None)
+ reactor.callLater(2, checkIt)
+
+ return d
+
+ def test_logWaitsAndTxnTimeout(self):
+ """
+ CommonStoreTransactionMonitor logs waiting transactions and terminates long transactions.
+ """
+
+ # Patch config to turn on log waits then rebuild the store
+ self.patch(self._sqlStore, "logTransactionWaits", 1)
+ self.patch(self._sqlStore, "timeoutTransactions", 2)
+
+ ctr = [0, 0]
+ def counter(logStr):
+ if "wait" in logStr:
+ ctr[0] += 1
+ elif "abort" in logStr:
+ ctr[1] += 1
+ self.patch(log, "error", counter)
+
+ txn = self.transactionUnderTest()
+
+ def doTest(result):
+ self.assertNotEqual(ctr[0], 0)
+ self.assertNotEqual(ctr[1], 0)
+ self.assertTrue(txn._sqlTxn._completed)
+ d = Deferred()
+ d.addCallback(doTest)
+
+ def checkIt():
+ d.callback(None)
+ reactor.callLater(3, checkIt)
+
+ return d
+
@inlineCallbacks
def test_subtransactionOK(self):
"""
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120220/2e87e52b/attachment-0001.html>
More information about the calendarserver-changes
mailing list