Source code for crossbar._logging

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import json
import os
import re
import sys
from io import StringIO
from json import JSONEncoder

from pygments import formatters, highlight, lexers
from twisted.logger import ILogObserver, LogLevel, formatEvent, formatTime, globalLogPublisher
from txaio import get_global_log_level, set_global_log_level
from txaio.tx import log_levels
from zope.interface import provider

from crossbar import _log_categories

[docs] record_separator = "\x1e"
[docs] cb_logging_aware = "CROSSBAR_RICH_LOGGING_ENABLE=True"
try: from colorama import Fore except ImportError: # No colorama, so just mock it out.
[docs] class _Fore(object):
[docs] BLACK = ""
[docs] RED = ""
[docs] GREEN = ""
[docs] YELLOW = ""
[docs] BLUE = ""
[docs] MAGENTA = ""
[docs] CYAN = ""
[docs] WHITE = ""
[docs] RESET = ""
[docs] LIGHTBLACK_EX = ""
[docs] LIGHTRED_EX = ""
[docs] LIGHTGREEN_EX = ""
[docs] LIGHTYELLOW_EX = ""
[docs] LIGHTBLUE_EX = ""
[docs] LIGHTMAGENTA_EX = ""
[docs] LIGHTCYAN_EX = ""
[docs] LIGHTWHITE_EX = ""
Fore = _Fore()
[docs] STANDARD_FORMAT = "{startcolor}{time} [{system}]{endcolor} {text}"
[docs] SYSLOGD_FORMAT = "{startcolor}[{system}]{endcolor} {text}"
[docs] NONE_FORMAT = "{text}"
# A regex that matches ANSI escape sequences # http://stackoverflow.com/a/33925425
[docs] _ansi_cleaner = re.compile(r"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]")
[docs] def strip_ansi(text): """ Strip ANSI codes. """ return _ansi_cleaner.sub("", text)
[docs] def escape_formatting(text): """ Escape things that would otherwise confuse `.format`. """ return text.replace("{", "{{").replace("}", "}}")
[docs] def make_stdout_observer( levels=(LogLevel.info,), show_source=False, format="standard", trace=False, color=False, _file=None, _categories=None, ): """ Create an observer which prints logs to L{sys.stdout}. """ if _file is None: _file = sys.__stdout__ if _categories is None: _categories = _log_categories.log_categories @provider(ILogObserver) def StandardOutObserver(event): if event["log_level"] not in levels: return if event["log_level"] == LogLevel.debug: if event.get("txaio_trace", False) and not trace: return if event.get("log_system", "-") == "-": logSystem = "{:<10} {:>6}".format("Controller", os.getpid()) else: logSystem = event["log_system"] if show_source and event.get("log_namespace") is not None: logSystem += " " + event.get("cb_namespace", event.get("log_namespace", "")) if event.get("log_category"): format_string = _categories.get(event["log_category"]) if format_string: event = event.copy() event["log_format"] = format_string if format == "standard": FORMAT_STRING = STANDARD_FORMAT elif format == "syslogd": FORMAT_STRING = SYSLOGD_FORMAT elif format == "none": FORMAT_STRING = NONE_FORMAT else: assert False if color: # Choose a color depending on where the log came from. if "Controller" in logSystem: fore = Fore.BLUE elif "Router" in logSystem: fore = Fore.YELLOW elif "Container" in logSystem: fore = Fore.GREEN else: fore = Fore.WHITE eventString = FORMAT_STRING.format( startcolor=fore, time=formatTime(event["log_time"]), system=logSystem, endcolor=Fore.RESET, text=formatEvent(event), ) else: eventString = strip_ansi( FORMAT_STRING.format( startcolor="", time=formatTime(event["log_time"]), system=logSystem, endcolor="", text=formatEvent(event), ) ) print(eventString, file=_file) return StandardOutObserver
[docs] def make_stderr_observer( levels=(LogLevel.warn, LogLevel.error, LogLevel.critical), show_source=False, format="standard", color=False, _file=None, _categories=None, ): """ Create an observer which prints logs to L{sys.stderr}. """ if _file is None: _file = sys.__stderr__ if _categories is None: _categories = _log_categories.log_categories @provider(ILogObserver) def StandardErrorObserver(event): if event["log_level"] not in levels: return if event.get("log_system", "-") == "-": logSystem = "{:<10} {:>6}".format("Controller", os.getpid()) else: logSystem = event["log_system"] if show_source and event.get("log_namespace") is not None: logSystem += " " + event.get("cb_namespace", event.get("log_namespace", "")) if event.get("log_category"): format_string = _categories.get(event["log_category"]) if format_string: event = event.copy() event["log_format"] = format_string if event.get("log_format", None) is not None: eventText = formatEvent(event) else: eventText = "" if "log_failure" in event: # This is a traceback. Print it. eventText = eventText + event["log_failure"].getTraceback() if format == "standard": FORMAT_STRING = STANDARD_FORMAT elif format == "syslogd": FORMAT_STRING = SYSLOGD_FORMAT elif format == "none": FORMAT_STRING = NONE_FORMAT else: assert False if color: # Errors are always red. fore = Fore.RED eventString = FORMAT_STRING.format( startcolor=fore, time=formatTime(event["log_time"]), system=logSystem, endcolor=Fore.RESET, text=eventText, ) else: eventString = strip_ansi( FORMAT_STRING.format( startcolor="", time=formatTime(event["log_time"]), system=logSystem, endcolor="", text=eventText ) ) print(eventString, file=_file) return StandardErrorObserver
[docs] def make_JSON_observer(outFile): """ Make an observer which writes JSON to C{outfile}. """ class CrossbarEncoder(JSONEncoder): def default(self, o): return escape_formatting(repr(o)) encoder = CrossbarEncoder() @provider(ILogObserver) def _make_json(_event): event = dict(_event) level = event.pop("log_level", LogLevel.info).name # as soon as possible, we wish to give up if this event is # outside our target log-level; this is to prevent # (de-)serializing all the debug() messages (for example) from # workers to the controller. if log_levels.index(level) > log_levels.index(get_global_log_level()): return done_json = {"level": level, "namespace": event.pop("log_namespace", "")} eventText = formatEvent(event) if "log_failure" in event: # This is a traceback. Print it. traceback = event["log_failure"].getTraceback() eventText = eventText + os.linesep + traceback done_json["text"] = escape_formatting(eventText) try: event.pop("log_logger", "") event.pop("log_format", "") event.pop("log_source", "") event.pop("log_system", "") event.pop("log_failure", "") event.pop("failure", "") event.update(done_json) text = encoder.encode(event) except Exception: text = encoder.encode({"text": done_json["text"], "level": "error", "namespace": "crossbar._logging"}) if not isinstance(text, str): text = text.decode("utf8") print(text, end=record_separator, file=outFile) outFile.flush() return _make_json
[docs] def make_logfile_observer(path, show_source=False): """ Make an observer that writes out to C{path}. """ from twisted.logger import FileLogObserver from twisted.python.logfile import DailyLogFile f = DailyLogFile.fromFullPath(path) def _render(event): if event.get("log_system", "-") == "-": logSystem = "{:<10} {:>6}".format("Controller", os.getpid()) else: logSystem = event["log_system"] if show_source and event.get("log_namespace") is not None: logSystem += " " + event.get("cb_namespace", event.get("log_namespace", "")) if event.get("log_format", None) is not None: eventText = formatEvent(event) else: eventText = "" if "log_failure" in event: # This is a traceback. Print it. eventText = eventText + event["log_failure"].getTraceback() eventString = ( strip_ansi( STANDARD_FORMAT.format( startcolor="", time=formatTime(event["log_time"]), system=logSystem, endcolor="", text=eventText ) ) + os.linesep ) return eventString return FileLogObserver(f, _render)
[docs] def color_json(json_str): """ Given an already formatted JSON string, return a colored variant which will produce colored output on terminals. """ assert isinstance(json_str, str) return highlight(json_str, lexers.JsonLexer(), formatters.TerminalFormatter())
[docs] class JSON(object): """ An object which encapsulates a JSON-dumpable item, and will colorise it when it is __str__'d. """ def __init__(self, item):
[docs] self._item = item
[docs] def __str__(self): json_str = json.dumps(self._item, separators=(", ", ": "), sort_keys=False, indent=3, ensure_ascii=False) output_str = os.linesep + color_json(json_str) if bytes == str: # In case json.dumps returns a not-str on Py2, we will encode output_str = output_str.encode("utf8") return output_str
[docs] class LogCapturer(object): """ A context manager that captures logs inside of it, and makes it available through the logs attribute, or the get_category method. """ def __init__(self, level="debug"):
[docs] self.logs = []
[docs] self._old_log_level = get_global_log_level()
[docs] self.desired_level = level
[docs] self.log_text = StringIO()
[docs] self._out_observer = make_stdout_observer( levels=(LogLevel.debug, LogLevel.info, LogLevel.warn, LogLevel.error), _file=self.log_text, trace=True )
[docs] def get_category(self, identifier): """ Get logs captured with the given log category. """ return [x for x in self.logs if x.get("log_category") == identifier]
[docs] def _got_log(self, log): self.logs.append(log) # Render them, to make sure there are no "can't format" errors self._out_observer(log)
[docs] def __enter__(self): set_global_log_level(self.desired_level) globalLogPublisher.addObserver(self._got_log) return self
[docs] def __exit__(self, type, value, traceback): globalLogPublisher.removeObserver(self._got_log) set_global_log_level(self._old_log_level)