Source code for crossbar.edge.worker.tracing

##############################################################################
#
#                        Crossbar.io
#     Copyright (C) typedef int GmbH. All rights reserved.
#
##############################################################################

import math
import uuid
from collections import deque
from datetime import datetime

import six
from autobahn import util
from autobahn.wamp import message
from twisted.internet.task import LoopingCall
from txaio import make_logger, perf_counter_ns, time_ns

__all__ = ("FabricRouterTrace",)

# import pyarrow as pa
# import pyarrow.parquet as pq
#
# pd.Timestamp (np.datetime64[ns])
# pa.Array.from_buffers
# pa.RecordBatch.from_arrays
# pa.Table.from_batches
# pq.ParquetFile.write_table

# a = pa.array([random.randint(0, 2**32-1) for i in range(1000)], type=pa.uint64())

# https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table
# https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch
# https://arrow.apache.org/docs/python/generated/pyarrow.Array.html#pyarrow.Array


class TracedMessage(object):
    __slots__ = (
        "ts",
        "pc",
        "seq",
        "realm",
        "direction",
        "session_id",
        "authid",
        "authrole",
        "msg",
    )

    def __init__(self, seq, realm, direction, session_id, authid, authrole, msg):
        self.ts = time_ns()
        self.pc = perf_counter_ns()
        self.seq = seq
        self.realm = realm
        self.direction = direction
        self.session_id = session_id
        self.authid = authid
        self.authrole = authrole
        self.msg = msg

    def marshal(self, include_message=False):
        obj = {
            "ts": self.ts,
            "pc": self.pc,
            "seq": self.seq,
            "realm": self.realm,
            "direction": self.direction,
            "session_id": self.session_id,
            "authid": self.authid,
            "authrole": self.authrole,
            "msg_type": six.text_type(self.msg.__class__.__name__),
            "correlation": self.msg.correlation_id,
            "correlation_uri": self.msg.correlation_uri,
            "correlation_is_anchor": self.msg.correlation_is_anchor,
            "correlation_is_last": self.msg.correlation_is_last,
            "enc_algo": self.msg.enc_algo if hasattr(self.msg, "enc_algo") else None,
            "enc_key": self.msg.enc_key if hasattr(self.msg, "enc_key") else None,
            "enc_serializer": self.msg.enc_serializer if hasattr(self.msg, "enc_serializer") else None,
        }

        # track msg serialization sizes
        serializations = {}
        for ser, val in self.msg._serialized.items():
            serializations[ser.NAME] = len(val)
        obj["serializations"] = serializations

        if include_message:
            # forward raw WAMP message
            obj["msg"] = self.msg.marshal()

        return obj


class TracedAction(object):
    __slots__ = (
        "correlation_id",
        "correlation_uri",
        "ts",
        "pc",
        "seq",
        "realm",
        "action",
        "originator",
        "responders",
        "originator_enc",
        "responders_enc",
        "success",
    )

    def __init__(self, correlation_id, correlation_uri, seq, realm, action, originator, responders):
        self.correlation_id = correlation_id
        self.correlation_uri = correlation_uri
        self.ts = time_ns()
        self.pc = perf_counter_ns()
        self.seq = seq
        self.realm = realm
        self.action = action
        self.originator = originator
        self.responders = responders
        self.success = None

    def marshal(self):
        obj = {
            "ts": self.ts,
            "pc": self.pc,
            "seq": self.seq,
            "realm": self.realm,
            "action": self.action,
            "correlation_id": self.correlation_id,
            "correlation_uri": self.correlation_uri,
            "originator": self.originator,
            "responders": self.responders,
            "success": self.success,
        }

        return obj


[docs] class FabricRouterTrace(object): """ How it works: A trace is always run from a router worker process. The router code calls into maybe_trace_rx_msg/maybe_trace_tx_msg to trace messages as they are received and sent to/from the router. These 2 functions check if the message is to be traced in the first place, and if so, create a trace record and append that to a in-memory list in the same (main) thread. Every 10-100ms, a looping call will trigger that will then take the buffered messages in the list and forward that to a background thread where it is written to a LMDB database file specific to this trace. """
[docs] log = make_logger()
def __init__( self, session, trace_id, on_trace_period_finished=None, trace_level="message", trace_app_payload=False, batching_period=200, persist=False, duration=None, limit=60, ): """ :param trace_id: The ID assigned to the trace within the router-realm. :type trace_id: str :param trace_app_payload: Flag to control tracing of the actual app _payload_ (args, kwargs) :type trace_app_payload: bool :param batching_period: The batching period in ms. :type batching_period: int :param persist: Flag to control trace persistence (to disk, that is LMDB). :type persist: bool :param duration: Run time of the trace in secs. If given, the trace will be automatically stopped after this time. Otherwise a trace needs to be stopped explicitly. :type duration: int :param limit: Limit in secs of the history kept for the trace. :type limit: int """
[docs] self._session = session
[docs] self._trace_id = trace_id
[docs] self._on_trace_period_finished = on_trace_period_finished
[docs] self._trace_level = trace_level
[docs] self._trace_app_payload = trace_app_payload
[docs] self._batching_period = batching_period
[docs] self._persist = persist
[docs] self._duration = duration
[docs] self._limit = limit
[docs] self._status = "created"
if persist: self._persistent_id = str(uuid.uuid4()) else: self._persistent_id = None
[docs] self._started = None
[docs] self._ended = None
[docs] self._seq = 0
[docs] self._period = 0
[docs] self._period_ts = None
# the current batch of tracing records for trace_level==u'message'
[docs] self._current_batch = []
# for trace_level==u'action', map of # open actions: correlation => instance of TracedAction
[docs] self._open_actions = {}
# the accumulated data for the trace max_periods = int(math.ceil(float(limit) * 1000.0 / float(batching_period)))
[docs] self._trace = deque(maxlen=max_periods)
# the looping call the accumulates the current batch
[docs] self._batch_looper = None
[docs] def _batch_loop(self): period = { "finished_ts": time_ns(), "finished_pc": perf_counter_ns(), "period": self._period, "period_start": util.utcstr(self._period_ts), } current_batch = self._current_batch if current_batch: period["first_seq"] = current_batch[0].seq period["last_seq"] = current_batch[-1].seq else: period["first_seq"] = None period["last_seq"] = None # fire user callback with current batch if self._on_trace_period_finished: self._on_trace_period_finished(self._trace_id, period, current_batch) # append current batch to history self._trace.append((period, current_batch)) # next period self._period += 1 self._period_ts = datetime.utcnow() if current_batch: self._current_batch = []
[docs] def start(self): if self._status == "created": self._status = "running" self._started = datetime.utcnow() self._period_ts = self._started self._batch_looper = LoopingCall(self._batch_loop) self._batch_looper.start(float(self._batching_period) / 1000.0) else: self.log.warn('skip starting of Trace not in status "created", but "{status}"', status=self._status)
[docs] def stop(self): if self._status == "running": if self._batch_looper: if self._batch_looper.running: self._batch_looper.stop() self._batch_looper = None self._status = "stopped" self._ended = datetime.utcnow() else: self.log.warn('skip stopping of Trace not in status "running", but "{status}"', status=self._status)
[docs] def marshal(self): if self._started: if self._ended: runtime = (self._ended - self._started).total_seconds() else: runtime = (datetime.utcnow() - self._started).total_seconds() else: runtime = None data = { "id": self._trace_id, "node_id": self._session._node_id, "worker_id": self._session._worker_id, "persistent_id": self._persistent_id, "status": self._status, "started": util.utcstr(self._started) if self._started else None, "ended": util.utcstr(self._ended) if self._ended else None, "runtime": runtime, "options": { "trace_level": self._trace_level, "trace_app_payload": self._trace_app_payload, "batching_period": self._batching_period, "persist": self._persist, "duration": self._duration, "limit": self._limit, }, "next_period": self._period, "next_seq": self._seq, } return data
[docs] def get_data(self, from_seq, to_seq, limit): # FIXME: implement selection for sequence range / limit res = [] for period, batch in self._trace: if batch: if self._trace_level == "message": res.extend([trace_record.marshal(self._trace_app_payload) for trace_record in batch]) elif self._trace_level == "action": res.extend([traced_action.marshal() for traced_action in batch]) else: raise Exception("logic error") if len(res) > limit: res = res[:limit] break return res
[docs] def maybe_trace_rx_msg(self, session, msg): self._maybe_trace_msg(session, msg, "rx")
[docs] def maybe_trace_tx_msg(self, session, msg): self._maybe_trace_msg(session, msg, "tx")
[docs] def _maybe_trace_msg(self, session, msg, direction): # FIXME: implement tracing filters is_traced = self._status == "running" if is_traced: self.log.debug("{direction}: {msg}", direction=direction.upper(), msg=msg) if self._trace_level == "message": trace_record = TracedMessage( self._seq, session._realm, direction, session._session_id, session._authid, session._authrole, msg ) self._current_batch.append(trace_record) self._seq += 1 elif self._trace_level == "action": if msg.correlation_is_anchor: # RPC/PubSub related actions if ( isinstance(msg, message.Call) or isinstance(msg, message.Register) or isinstance(msg, message.Unregister) or isinstance(msg, message.Publish) or isinstance(msg, message.Subscribe) or isinstance(msg, message.Unsubscribe) ): _action = six.text_type(msg.__class__.__name__) else: _action = None if _action: traced_action = TracedAction( msg.correlation_id, msg.correlation_uri, self._seq, session._realm, _action, session._session_id, [], ) self._open_actions[msg.correlation_id] = traced_action self._seq += 1 self.log.debug("New TRACE ACTION: {traced_action}", traced_action=traced_action) if isinstance(msg, message.Invocation) or isinstance(msg, message.Event): if msg.correlation_id in self._open_actions: response = { "session_id": session._session_id, "authid": session._authid, "authrole": session._authid, "enc_algo": msg.enc_algo, "enc_key": msg.enc_key, "enc_serializer": msg.enc_serializer, } self._open_actions[msg.correlation_id].responders.append(response) if msg.correlation_is_last: if msg.correlation_id in self._open_actions: traced_action = self._open_actions[msg.correlation_id] traced_action.success = not isinstance(msg, message.Error) del self._open_actions[msg.correlation_id] self._current_batch.append(traced_action) self.log.debug("TRACE ACTION finished: {traced_action}", traced_action=traced_action) else: raise Exception('internal error: invalid trace level "{}"'.format(self._trace_level))