[CalendarServer-changes] [11366] CalendarServer/trunk/twext/python
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jun 14 12:32:07 PDT 2013
Revision: 11366
http://trac.calendarserver.org//changeset/11366
Author: wsanchez at apple.com
Date: 2013-06-14 12:32:07 -0700 (Fri, 14 Jun 2013)
Log Message:
-----------
Add FilteringLogObserver.
Get rid of Wrapper suffix on observer class names.
Get rid of LogLevelFilteringLogObserverWrapper, replace with FilteringLogObserver and PredicateResult.
Modified Paths:
--------------
CalendarServer/trunk/twext/python/log.py
CalendarServer/trunk/twext/python/test/test_log.py
Modified: CalendarServer/trunk/twext/python/log.py
===================================================================
--- CalendarServer/trunk/twext/python/log.py 2013-06-14 19:25:43 UTC (rev 11365)
+++ CalendarServer/trunk/twext/python/log.py 2013-06-14 19:32:07 UTC (rev 11366)
@@ -70,8 +70,11 @@
"ILogObserver",
"ILegacyLogObserver",
"LogPublisher",
- "LogLevelFilteringLogObserverWrapper",
- "LegacyLogObserverWrapper",
+ "PredicateResult",
+ "ILogFilterPredicate",
+ "FilteringLogObserver",
+ "LogLevelFilterPredicate",
+ "LegacyLogObserver",
#"StandardIOObserver",
]
@@ -503,6 +506,7 @@
"""
An observer which can handle log events.
"""
+
def __call__(event):
"""
Log an event.
@@ -576,38 +580,94 @@
+class PredicateResult(Names):
+ """
+ Predicate results.
+ """
+ yes = NamedConstant() # Log this
+ no = NamedConstant() # Don't log this
+ maybe = NamedConstant() # No opinion
+
+
+
+class ILogFilterPredicate(Interface):
+ """
+ A predicate that determined whether an event should be logged.
+ """
+
+ def __call__(event):
+ """
+ Determine whether an event should be logged.
+
+ @returns: a L{PredicateResult}.
+ """
+
+
+
@implementer(ILogObserver)
-class LogLevelFilteringLogObserverWrapper(object):
+class FilteringLogObserver(object):
"""
- L{ILogObserver} that wraps another L{ILogObserver}, but does not
- forward events which have a L{LogLevel} lower than is configured
- for the event's namespace.
+ L{ILogObserver} that wraps another L{ILogObserver}, but filters
+ out events based on applying a series of L{ILogFilterPredicate}s.
"""
- def __init__(self, observer):
+ def __init__(self, observer, predicates):
"""
@param observer: an L{ILogObserver} to which this observer
will forward events.
+
+ @param predicates: an ordered iterable of predicates to apply
+ to events before forwarding to the wrapped observer.
"""
- self.observer = observer
+ self.observer = observer
+ self.predicates = list(predicates)
- @staticmethod
- def eventShouldLog(event):
- if event["log_level"] >= logLevelForNamespace(event["log_namespace"]):
- return True
- else:
- return False
+ def shouldLogEvent(self, event):
+ """
+ Determine whether an event should be logged, based
+ C{self.predicates}.
+ @param event: an event
+ """
+ for predicate in self.predicates:
+ result = predicate(event)
+ if result == PredicateResult.yes:
+ return True
+ if result == PredicateResult.no:
+ return False
+ if result == PredicateResult.maybe:
+ continue
+ raise AssertionError("Unknown predicate result: {0}".format(result))
+ return True
+
def __call__(self, event):
- if self.eventShouldLog(event):
+ if self.shouldLogEvent(event):
self.observer(event)
+ at implementer(ILogFilterPredicate)
+class LogLevelFilterPredicate(object):
+ """
+ L{ILogFilterPredicate} that filters out events with a log level
+ lower than the log level for the event's namespace.
+ """
+
+ def __call__(self, event):
+ level = event["log_level"]
+ namespace = event["log_namespace"]
+
+ if level < logLevelForNamespace(namespace):
+ return PredicateResult.no
+
+ return PredicateResult.maybe
+
+
+
@implementer(ILogObserver)
-class LegacyLogObserverWrapper(object):
+class LegacyLogObserver(object):
"""
L{ILogObserver} that wraps an L{ILegacyLogObserver}.
"""
@@ -659,6 +719,7 @@
#
# Utilities
#
+
class CallMapping(object):
def __init__(self, submapping):
self._submapping = submapping
@@ -699,10 +760,10 @@
@rtype: L{unicode}
"""
return unicode(
- _theFormatter.vformat(formatString, (), CallMapping(mapping))
+ formatter.vformat(formatString, (), CallMapping(mapping))
)
-_theFormatter = Formatter()
+formatter = Formatter()
@@ -710,13 +771,11 @@
# Default observers
# FIXME: ...
#
-theLegacyLogObserver = LegacyLogObserverWrapper(twistedLogMessage)
-
+theLegacyLogObserver = LegacyLogObserver(twistedLogMessage)
theFilteredLogPublisher = LogPublisher(theLegacyLogObserver) # Add post-filtering observers here
-theFilteringLogObserver = LogLevelFilteringLogObserverWrapper(theFilteredLogPublisher)
-theLogPublisher = LogPublisher(theFilteringLogObserver) # Add pre-filtering observers here
+theFilteringLogObserver = FilteringLogObserver(theFilteredLogPublisher, (LogLevelFilterPredicate(),))
+Logger.publisher = LogPublisher(theFilteringLogObserver) # Add pre-filtering observers here
-Logger.publisher = theLogPublisher
######################################################################
Modified: CalendarServer/trunk/twext/python/test/test_log.py
===================================================================
--- CalendarServer/trunk/twext/python/test/test_log.py 2013-06-14 19:25:43 UTC (rev 11365)
+++ CalendarServer/trunk/twext/python/test/test_log.py 2013-06-14 19:32:07 UTC (rev 11366)
@@ -14,16 +14,20 @@
# limitations under the License.
##
+from zope.interface.verify import verifyObject, BrokenMethodImplementation
+
from twisted.python import log as twistedLogging
from twisted.python.failure import Failure
-from twext.python.log import LogLevel, InvalidLogLevelError
-from twext.python.log import logLevelsByNamespace, logLevelForNamespace
-from twext.python.log import setLogLevelForNamespace, clearLogLevels
-from twext.python.log import pythonLogLevelMapping
-from twext.python.log import Logger, LegacyLogger
-
-from twext.python.log import formatWithCall
+from twext.python.log import (
+ LogLevel, InvalidLogLevelError,
+ logLevelsByNamespace,
+ logLevelForNamespace, setLogLevelForNamespace, clearLogLevels,
+ pythonLogLevelMapping,
+ formatEvent, formatWithCall,
+ Logger, LegacyLogger,
+ ILogObserver, LogPublisher,
+)
from twistedcaldav.test.util import TestCase
@@ -84,26 +88,95 @@
-class Logging(TestCase):
+class SetUpTearDown(object):
def setUp(self):
- super(Logging, self).setUp()
+ super(SetUpTearDown, self).setUp()
clearLogLevels()
def tearDown(self):
- super(Logging, self).tearDown()
+ super(SetUpTearDown, self).tearDown()
clearLogLevels()
- def test_repr(self):
+
+class LoggingTests(SetUpTearDown, TestCase):
+ """
+ General module tests.
+ """
+
+ def test_levelWithName(self):
"""
- repr() on Logger
+ Look up log level by name.
"""
- namespace = "bleargh"
- log = Logger(namespace)
- self.assertEquals(repr(log), "<Logger {0}>".format(repr(namespace)))
+ for level in LogLevel.iterconstants():
+ self.assertIdentical(LogLevel.levelWithName(level.name), level)
+ def test_levelWithInvalidName(self):
+ """
+ You can't make up log level names.
+ """
+ bogus = "*bogus*"
+ try:
+ LogLevel.levelWithName(bogus)
+ except InvalidLogLevelError as e:
+ self.assertIdentical(e.level, bogus)
+ else:
+ self.fail("Expected InvalidLogLevelError.")
+
+
+ def test_defaultLogLevel(self):
+ """
+ Default log level is used.
+ """
+ self.failUnless(logLevelForNamespace(None), defaultLogLevel)
+ self.failUnless(logLevelForNamespace(""), defaultLogLevel)
+ self.failUnless(logLevelForNamespace("rocker.cool.namespace"), defaultLogLevel)
+
+
+ def test_setLogLevel(self):
+ """
+ Setting and retrieving log levels.
+ """
+ setLogLevelForNamespace(None, LogLevel.error)
+ setLogLevelForNamespace("twext.web2", LogLevel.debug)
+ setLogLevelForNamespace("twext.web2.dav", LogLevel.warn)
+
+ self.assertEquals(logLevelForNamespace(None ), LogLevel.error)
+ self.assertEquals(logLevelForNamespace("twisted" ), LogLevel.error)
+ self.assertEquals(logLevelForNamespace("twext.web2" ), LogLevel.debug)
+ self.assertEquals(logLevelForNamespace("twext.web2.dav" ), LogLevel.warn)
+ self.assertEquals(logLevelForNamespace("twext.web2.dav.test" ), LogLevel.warn)
+ self.assertEquals(logLevelForNamespace("twext.web2.dav.test1.test2"), LogLevel.warn)
+
+
+ def test_setInvalidLogLevel(self):
+ """
+ Can't pass invalid log levels to setLogLevelForNamespace().
+ """
+ self.assertRaises(InvalidLogLevelError, setLogLevelForNamespace, "twext.web2", object())
+
+ # Level must be a constant, not the name of a constant
+ self.assertRaises(InvalidLogLevelError, setLogLevelForNamespace, "twext.web2", "debug")
+
+
+ def test_clearLogLevels(self):
+ """
+ Clearing log levels.
+ """
+ setLogLevelForNamespace("twext.web2", LogLevel.debug)
+ setLogLevelForNamespace("twext.web2.dav", LogLevel.error)
+
+ clearLogLevels()
+
+ self.assertEquals(logLevelForNamespace("twisted" ), defaultLogLevel)
+ self.assertEquals(logLevelForNamespace("twext.web2" ), defaultLogLevel)
+ self.assertEquals(logLevelForNamespace("twext.web2.dav" ), defaultLogLevel)
+ self.assertEquals(logLevelForNamespace("twext.web2.dav.test" ), defaultLogLevel)
+ self.assertEquals(logLevelForNamespace("twext.web2.dav.test1.test2"), defaultLogLevel)
+
+
def test_namespace_default(self):
"""
Default namespace is module name.
@@ -112,6 +185,76 @@
self.assertEquals(log.namespace, __name__)
+ def test_formatWithCall(self):
+ """
+ L{formatWithCall} is an extended version of L{unicode.format} that will
+ interpret a set of parentheses "C{()}" at the end of a format key to
+ mean that the format key ought to be I{called} rather than stringified.
+ """
+ self.assertEquals(
+ formatWithCall(u"Hello, {world}. {callme()}.",
+ dict(world="earth",
+ callme=lambda: "maybe")),
+ "Hello, earth. maybe."
+ )
+ self.assertEquals(
+ formatWithCall(u"Hello, {repr()!r}.",
+ dict(repr=lambda: 'repr')),
+ "Hello, 'repr'."
+ )
+
+
+ def test_formatEvent(self):
+ """
+ L{formatEvent} will format an event according to several rules:
+
+ - A string with no formatting instructions will be passed straight
+ through.
+
+ - PEP 3101 strings will be formatted using the keys and values of
+ the event as named fields.
+
+ - PEP 3101 keys ending with C{()} will be treated as instructions
+ to call that key (which ought to be a callable) before
+ formatting.
+
+ L{formatEvent} will always return L{unicode}, and if given
+ bytes, will always treat its format string as UTF-8 encoded.
+ """
+ def format(log_format, **event):
+ event["log_format"] = log_format
+ result = formatEvent(event)
+ self.assertIdentical(type(result), unicode)
+ return result
+
+ self.assertEquals(u"", format(""))
+ self.assertEquals(u"abc", format("{x}", x="abc"))
+ self.assertEquals(u"no, yes.",
+ format("{not_called}, {called()}.",
+ not_called="no", called=lambda: "yes"))
+ self.assertEquals(u'S\xe1nchez', format("S\xc3\xa1nchez"))
+ self.assertIn(u"Unable to format event", format(b"S\xe1nchez"))
+ self.assertIn(u"Unable to format event",
+ format(b"S{a}nchez", a=b"\xe1"))
+ self.assertIn(u"S'\\xe1'nchez",
+ format(b"S{a!r}nchez", a=b"\xe1"))
+
+
+
+class LoggerTests(SetUpTearDown, TestCase):
+ """
+ Tests for L{Logger}.
+ """
+
+ def test_repr(self):
+ """
+ repr() on Logger
+ """
+ namespace = "bleargh"
+ log = Logger(namespace)
+ self.assertEquals(repr(log), "<Logger {0}>".format(repr(namespace)))
+
+
def test_namespace_attribute(self):
"""
Default namespace for classes using L{Logger} as a descriptor is the
@@ -139,7 +282,7 @@
self.assertIn("log_source", log.event)
self.assertEquals(log.event["log_source"], obj)
- stuff = log.formatEvent(log.event)
+ stuff = formatEvent(log.event)
self.assertIn("Hello, <LogComposedObject hello>.", stuff)
@@ -176,7 +319,7 @@
# FIXME: this checks the end of message because we do formatting in emit()
self.assertEquals(
- log.formatEvent(log.event),
+ formatEvent(log.event),
message
)
else:
@@ -227,157 +370,107 @@
self.assertEquals(log.event["log_source"], None)
- def test_defaultLogLevel(self):
+ def test_setLevel(self):
"""
- Default log level is used.
+ Set level on the logger directly.
"""
- self.failUnless(logLevelForNamespace(None), defaultLogLevel)
- self.failUnless(logLevelForNamespace(""), defaultLogLevel)
- self.failUnless(logLevelForNamespace("rocker.cool.namespace"), defaultLogLevel)
+ log = Logger()
+ for level in (LogLevel.error, LogLevel.info):
+ log.setLevel(level)
+ self.assertIdentical(level, log.level())
+ self.assertIdentical(level, logLevelForNamespace(log.namespace))
- def test_logLevelWithName(self):
+
+ def test_logInvalidLogLevel(self):
"""
- Look up log level by name.
+ Test passing in a bogus log level to C{emit()}.
"""
- for level in LogLevel.iterconstants():
- self.assertIdentical(LogLevel.levelWithName(level.name), level)
+ log = TestLogger()
+ log.emit("*bogus*")
- def test_logLevelWithInvalidName(self):
- """
- You can't make up log level names.
- """
- bogus = "*bogus*"
- try:
- LogLevel.levelWithName(bogus)
- except InvalidLogLevelError as e:
- self.assertIdentical(e.level, bogus)
- else:
- self.fail("Expected InvalidLogLevelError.")
+ errors = self.flushLoggedErrors(InvalidLogLevelError)
+ self.assertEquals(len(errors), 1)
- def test_setLogLevel(self):
- """
- Setting and retrieving log levels.
- """
- setLogLevelForNamespace(None, LogLevel.error)
- setLogLevelForNamespace("twext.web2", LogLevel.debug)
- setLogLevelForNamespace("twext.web2.dav", LogLevel.warn)
- self.assertEquals(logLevelForNamespace(None ), LogLevel.error)
- self.assertEquals(logLevelForNamespace("twisted" ), LogLevel.error)
- self.assertEquals(logLevelForNamespace("twext.web2" ), LogLevel.debug)
- self.assertEquals(logLevelForNamespace("twext.web2.dav" ), LogLevel.warn)
- self.assertEquals(logLevelForNamespace("twext.web2.dav.test" ), LogLevel.warn)
- self.assertEquals(logLevelForNamespace("twext.web2.dav.test1.test2"), LogLevel.warn)
+class LogPublisherTests(SetUpTearDown, TestCase):
+ """
+ Tests for L{LogPublisher}.
+ """
-
- def test_setInvalidLogLevel(self):
+ def test_interface(self):
"""
- Can't pass invalid log levels to setLogLevelForNamespace().
+ L{LogPublisher} is an L{ILogObserver}.
"""
- self.assertRaises(InvalidLogLevelError, setLogLevelForNamespace, "twext.web2", object())
+ publisher = LogPublisher()
+ try:
+ verifyObject(ILogObserver, publisher)
+ except BrokenMethodImplementation as e:
+ self.fail(e)
- # Level must be a constant, not the name of a constant
- self.assertRaises(InvalidLogLevelError, setLogLevelForNamespace, "twext.web2", "debug")
-
- def test_clearLogLevel(self):
+ def test_observers(self):
"""
- Clearing log levels.
+ L{LogPublisher.observers} returns the observers.
"""
- setLogLevelForNamespace("twext.web2", LogLevel.debug)
- setLogLevelForNamespace("twext.web2.dav", LogLevel.error)
+ o1 = lambda e: None
+ o2 = lambda e: None
- clearLogLevels()
+ publisher = LogPublisher(o1, o2)
+ self.assertEquals(set((o1, o2)), set(publisher.observers))
- self.assertEquals(logLevelForNamespace("twisted" ), defaultLogLevel)
- self.assertEquals(logLevelForNamespace("twext.web2" ), defaultLogLevel)
- self.assertEquals(logLevelForNamespace("twext.web2.dav" ), defaultLogLevel)
- self.assertEquals(logLevelForNamespace("twext.web2.dav.test" ), defaultLogLevel)
- self.assertEquals(logLevelForNamespace("twext.web2.dav.test1.test2"), defaultLogLevel)
-
- def test_setLevelOnLogger(self):
+ def test_addObserver(self):
"""
- Set level on the logger directly.
+ L{LogPublisher.addObserver} adds an observer.
"""
- log = Logger()
+ o1 = lambda e: None
+ o2 = lambda e: None
+ o3 = lambda e: None
- for level in (LogLevel.error, LogLevel.info):
- log.setLevel(level)
- self.assertIdentical(level, log.level())
- self.assertIdentical(level, logLevelForNamespace(log.namespace))
+ publisher = LogPublisher(o1, o2)
+ publisher.addObserver(o3)
+ self.assertEquals(set((o1, o2, o3)), set(publisher.observers))
- def test_logInvalidLogLevel(self):
+ def test_removeObserver(self):
"""
- Test passing in a bogus log level to C{emit()}.
+ L{LogPublisher.removeObserver} removes an observer.
"""
- log = TestLogger()
+ o1 = lambda e: None
+ o2 = lambda e: None
+ o3 = lambda e: None
- log.emit("*bogus*")
+ publisher = LogPublisher(o1, o2, o3)
+ publisher.removeObserver(o2)
+ self.assertEquals(set((o1, o3)), set(publisher.observers))
- errors = self.flushLoggedErrors(InvalidLogLevelError)
- self.assertEquals(len(errors), 1)
-
- def test_formatWithCall(self):
+ def test_fanOut(self):
"""
- L{formatWithCall} is an extended version of L{unicode.format} that will
- interpret a set of parentheses "C{()}" at the end of a format key to
- mean that the format key ought to be I{called} rather than stringified.
+ L{LogPublisher} calls its observers.
"""
- self.assertEquals(
- formatWithCall(u"Hello, {world}. {callme()}.",
- dict(world="earth",
- callme=lambda: "maybe")),
- "Hello, earth. maybe."
- )
- self.assertEquals(
- formatWithCall(u"Hello, {repr()!r}.",
- dict(repr=lambda: 'repr')),
- "Hello, 'repr'."
- )
+ e1 = []
+ e2 = []
+ e3 = []
+ o1 = lambda e: e1.append(e)
+ o2 = lambda e: e2.append(e)
+ o3 = lambda e: e3.append(e)
- def test_formatEvent(self):
- """
- L{Logger.formatEvent} will format an event according to several rules:
+ publisher = LogPublisher(o1, o2, o3)
+ publisher.removeObserver(o2)
+ self.assertEquals(set((o1, o3)), set(publisher.observers))
- - A string with no formatting instructions will be passed straight
- through.
- - PEP 3101 strings will be formatted using the keys and values of
- the event as named fields.
- - PEP 3101 keys ending with C{()} will be treated as instructions
- to call that key (which ought to be a callable) before
- formatting.
+class LegacyLoggerTests(SetUpTearDown, TestCase):
+ """
+ Tests for L{LegacyLogger}.
+ """
- L{Logger.formatEvent} will always return L{unicode}, and if given
- bytes, will always treat its format string as UTF-8 encoded.
- """
- def formatEvent(log_format, **event):
- event["log_format"] = log_format
- result = Logger.formatEvent(event)
- self.assertIdentical(type(result), unicode)
- return result
-
- self.assertEquals(u"", formatEvent(""))
- self.assertEquals(u"abc", formatEvent("{x}", x="abc"))
- self.assertEquals(u"no, yes.",
- formatEvent("{not_called}, {called()}.",
- not_called="no", called=lambda: "yes"))
- self.assertEquals(u'S\xe1nchez', formatEvent("S\xc3\xa1nchez"))
- self.assertIn(u"Unable to format event", formatEvent(b"S\xe1nchez"))
- self.assertIn(u"Unable to format event",
- formatEvent(b"S{a}nchez", a=b"\xe1"))
- self.assertIn(u"S'\\xe1'nchez",
- formatEvent(b"S{a!r}nchez", a=b"\xe1"))
-
-
def test_legacy_msg(self):
"""
Test LegacyLogger's log.msg()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130614/40b2cc6b/attachment-0001.html>
More information about the calendarserver-changes
mailing list