[CalendarServer-changes] [15095] twext/trunk/twext/enterprise/adbapi2.py

source_changes at macosforge.org source_changes at macosforge.org
Thu Sep 3 13:40:30 PDT 2015


Revision: 15095
          http://trac.calendarserver.org//changeset/15095
Author:   cdaboo at apple.com
Date:     2015-09-03 13:40:30 -0700 (Thu, 03 Sep 2015)
Log Message:
-----------
Add warning log messages when a txn stalls due to DB pool being fully utilized.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/adbapi2.py

Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py	2015-09-03 20:39:06 UTC (rev 15094)
+++ twext/trunk/twext/enterprise/adbapi2.py	2015-09-03 20:40:30 UTC (rev 15095)
@@ -31,6 +31,7 @@
 
 import sys
 import weakref
+import time
 
 from cStringIO import StringIO
 from cPickle import dumps, loads
@@ -47,7 +48,6 @@
 from twisted.protocols.amp import Argument, String, Command, AMP, Integer
 from twisted.internet import reactor as _reactor
 from twisted.application.service import Service
-from twisted.python import log
 from twisted.internet.defer import maybeDeferred
 from twisted.python.components import proxyForInterface
 
@@ -63,7 +63,10 @@
     AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT, ICommandBlock
 )
 
+from twext.python.log import Logger
+log = Logger()
 
+
 # FIXME: there should be no defaults for connection metadata, it should be
 # discovered dynamically everywhere.  Right now it's specified as an explicit
 # argument to the ConnectionPool but it should probably be determined
@@ -252,11 +255,11 @@
                 # Report the error before doing anything else, since doing
                 # other things may cause the traceback stack to be eliminated
                 # if they raise exceptions (even internally).
-                log.err(
-                    Failure(),
+                log.failure(
                     "Exception from execute() on first statement in "
                     "transaction.  Possibly caused by a database server "
-                    "restart.  Automatically reconnecting now."
+                    "restart.  Automatically reconnecting now.",
+                    failure=Failure(),
                 )
                 try:
                     self._connection.close()
@@ -273,10 +276,10 @@
                     # making debugging surprising error conditions very
                     # difficult, so let's make sure that the error is logged
                     # just in case.
-                    log.err(
-                        Failure(),
+                    log.failure(
                         "Exception from close() while automatically "
-                        "reconnecting. (Probably not serious.)"
+                        "reconnecting. (Probably not serious.)",
+                        failure=Failure(),
                     )
 
                 # Now, if either of *these* things fail, there's an error here
@@ -373,11 +376,15 @@
 
 
     def abort(self):
-        return self._end(self._connection.rollback).addErrback(log.err)
+        def _report(f):
+            log.failure("txn abort", failure=f)
+        return self._end(self._connection.rollback).addErrback(_report)
 
 
     def terminate(self):
-        return self._end(self._connection.rollback, terminate=True).addErrback(log.err)
+        def _report(f):
+            log.failure("txn abort", failure=f)
+        return self._end(self._connection.rollback, terminate=True).addErrback(_report)
 
 
     def reset(self):
@@ -1121,12 +1128,31 @@
             basetxn._label = label
             self._busy.append(basetxn)
             txn = _SingleTxn(self, basetxn)
+            log.debug(
+                "ConnectionPool: txn busy '{label}': free={free}, busy={busy}, waiting={waiting}",
+                label=label,
+                free=len(self._free),
+                busy=len(self._busy) + len(self._finishing),
+                waiting=len(self._waiting),
+            )
         else:
             txn = _SingleTxn(self, _WaitingTxn(self, label=label))
             self._waiting.append(txn)
+            blocked = self._activeConnectionCount() >= self.maxConnections
+            if blocked:
+                txn._blocked_waiting_time = time.time()
+                log.warn("ConnectionPool: txn blocked '{label}'", label=label)
+            log.debug(
+                "ConnectionPool: txn waiting add '{label}': free={free}, busy={busy}, waiting={waiting} {blocked}",
+                label=label,
+                free=len(self._free),
+                busy=len(self._busy) + len(self._finishing),
+                waiting=len(self._waiting),
+                blocked="blocked" if blocked else "",
+            )
             # FIXME/TESTME: should be len(self._busy) + len(self._finishing)
             # (free doesn't need to be considered, as it's tested above)
-            if self._activeConnectionCount() < self.maxConnections:
+            if not blocked:
                 self._startOneMore()
 
         return txn
@@ -1172,7 +1198,7 @@
             self._repoolNow(baseTxn)
 
         def maybeTryAgain(f):
-            log.err(f, "Re-trying connection due to connection failure")
+            log.failure("Re-trying connection due to connection failure", failure=f)
             txn._retry = self.reactor.callLater(self.RETRY_TIMEOUT, resubmit)
 
         def resubmit():
@@ -1214,12 +1240,32 @@
             waiting = self._waiting.pop(0)
             self._busy.append(txn)
             waiting._unspoolOnto(txn)
+            if hasattr(waiting, "_blocked_waiting_time"):
+                log.warn(
+                    "ConnectionPool: txn unblocked '{label}': delay {delay:.1f}ms",
+                    label=waiting._label,
+                    delay=1000 * (time.time() - waiting._blocked_waiting_time),
+                )
+            log.debug(
+                "ConnectionPool: txn waiting remove '{label}': free={free}, busy={busy}, waiting={waiting}",
+                label=waiting._label,
+                free=len(self._free),
+                busy=len(self._busy) + len(self._finishing),
+                waiting=len(self._waiting),
+            )
         else:
             # If we are stopping, never add to the free list - release it
             if self._stopping:
                 txn._releaseConnection()
             else:
                 self._free.append(txn)
+                log.debug(
+                    "ConnectionPool: txn free '{label}': free={free}, busy={busy}, waiting={waiting}",
+                    label=txn._label,
+                    free=len(self._free),
+                    busy=len(self._busy) + len(self._finishing),
+                    waiting=len(self._waiting),
+                )
 
 
 
@@ -1309,7 +1355,7 @@
                 if f.type in command.errors:
                     returnValue(f)
                 else:
-                    log.err(Failure(), "shared database connection pool error")
+                    log.failure("shared database connection pool error", failure=f)
                     raise FailsafeException()
             else:
                 returnValue(val)
@@ -1423,7 +1469,7 @@
 
 
     def stopReceivingBoxes(self, why):
-        log.msg("(S) Stopped receiving boxes: " + why.getTraceback())
+        log.info("(S) Stopped receiving boxes: {}tb", tb=why.getTraceback())
 
 
     def unhandledError(self, failure):
@@ -1431,7 +1477,7 @@
         An unhandled error has occurred.  Since we can't really classify errors
         well on this protocol, log it and forget it.
         """
-        log.err(failure, "Shared connection pool server encountered an error.")
+        log.failure("Shared connection pool server encountered an error.", failure=failure)
 
 
     @failsafeResponder(StartTxn)
@@ -1544,11 +1590,11 @@
         An unhandled error has occurred.  Since we can't really classify errors
         well on this protocol, log it and forget it.
         """
-        log.err(failure, "Shared connection pool client encountered an error.")
+        log.failure("Shared connection pool client encountered an error.", failure=failure)
 
 
     def stopReceivingBoxes(self, why):
-        log.msg("(C) Stopped receiving boxes: " + why.getTraceback())
+        log.info("(C) Stopped receiving boxes: {tb}", tb=why.getTraceback())
 
 
     def newTransaction(self):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150903/0d746e6a/attachment.html>


More information about the calendarserver-changes mailing list