###############################################################################
#
# Crossbar.io Master
# Copyright (c) typedef int GmbH. Licensed under EUPLv1.2.
#
###############################################################################
import pprint
import uuid
from pprint import pformat
from typing import Any, Dict, List, Optional, Tuple
import humanize
import iso8601
import six
import txaio
import zlmdb
from autobahn import wamp
from autobahn.twisted.wamp import ApplicationSession
from autobahn.util import utcnow
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.types import CallDetails, EventDetails, PublishOptions, RegisterOptions, SubscribeOptions
from cfxdb.globalschema import GlobalSchema
from cfxdb.log import MNodeLog, MWorkerLog
from cfxdb.mrealmschema import MrealmSchema
from twisted.internet import defer
from twisted.internet.defer import Deferred, DeferredList, inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
from twisted.web import client
from txaio import make_logger, time_ns
from crossbar._util import hl, hlid, hltype, hlval
from crossbar.common.key import _read_release_key
from crossbar.master.api import APIS
from crossbar.master.arealm import ApplicationRealmManager
from crossbar.master.cluster import RouterClusterManager, WebClusterManager
from crossbar.master.mrealm.metadata import MetadataManager
__all__ = ("MrealmController",)
client._HTTP11ClientFactory.noisy = False
class Node(object):
"""
Run-time representation of CF nodes currently connected to this management realm.
"""
LAST_ACTIVITY_NONE = 0
LAST_ACTIVITY_CREATED = 1
LAST_ACTIVITY_STARTED = 2
LAST_ACTIVITY_READY = 3
LAST_ACTIVITY_HEARTBEAT = 4
LAST_ACTIVITY_CHECK = 5
def __init__(
self,
node_id=None,
heartbeat_counter=None,
heartbeat_time=None,
heartbeat_workers={},
status="online",
node_authid=None,
):
self.node_id = node_id
self.node_authid = node_authid
self.status = status
self.last_active = time_ns()
self.last_activity = Node.LAST_ACTIVITY_CREATED
self.heartbeat_counter = heartbeat_counter
self.heartbeat_time = heartbeat_time or 0
self.heartbeat_workers = heartbeat_workers
def __str__(self):
return pprint.pformat(self.marshal())
def marshal(self):
return {
"id": self.node_id,
"authid": self.node_authid,
"status": self.status,
"last_active": self.last_active,
"last_activity": self.last_activity,
# these data items are received from managed nodes
"workers": self.heartbeat_workers,
"counter": self.heartbeat_counter,
"time": self.heartbeat_time,
}
class Trace(object):
def __init__(self, trace_id, traced_workers, trace_options, eligible_reader_roles, exclude_reader_roles, status):
self.trace_id = trace_id
self.traced_workers = traced_workers
self.trace_options = trace_options
self.eligible_reader_roles = eligible_reader_roles
self.exclude_reader_roles = exclude_reader_roles
self.status = status
def marshal(self):
return {
"trace_id": self.trace_id,
"traced_workers": self.traced_workers,
"trace_options": self.trace_options,
"eligible_reader_roles": self.eligible_reader_roles,
"exclude_reader_roles": self.exclude_reader_roles,
"status": self.status,
}
[docs]
class MrealmController(ApplicationSession):
"""
Backend of user created management realms.
When a management realm is created, one instance of this component is started.
This management realm component is then running continuously during the lifetime
of the management realm.
"""
[docs]
def onUserError(self, fail, msg):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onUserError`
"""
if isinstance(fail.value, ApplicationError):
self.log.debug(
'{klass}.onUserError(): "{msg}"', klass=self.__class__.__name__, msg=fail.value.error_message()
)
else:
self.log.error(
'{klass}.onUserError(): "{msg}"\n{traceback}',
klass=self.__class__.__name__,
msg=msg,
traceback=txaio.failure_format_traceback(fail),
)
def __init__(self, config=None):
ApplicationSession.__init__(self, config)
# comes from "crossbar/master/node/node.py" (L299, L282)
assert config.extra
[docs]
self._uri_prefix = "crossbarfabriccenter.mrealm."
[docs]
self._node_oid_by_name = None
# background loop run periodically to health check and send heartbeats for this controller
# background loop run periodically to check & apply mrealm-level resources
[docs]
self._check_and_apply_loop = None
[docs]
self._check_and_apply_in_progress = False
# Release (public) key
[docs]
self._release_pubkey_hex = _read_release_key()["hex"]
assert "mrealm" in config.extra and isinstance(config.extra["mrealm"], str)
[docs]
self._mrealm_oid = uuid.UUID(config.extra["mrealm"])
# controller database
#
dbcfg = config.extra.get("controller-database", {})
assert dbcfg and isinstance(dbcfg, dict)
dbfile = dbcfg.get("dbfile", None)
assert dbfile and isinstance(dbfile, six.text_type)
maxsize = dbcfg.get("maxsize", None)
assert maxsize and type(maxsize) in six.integer_types
assert maxsize >= 2**20 and maxsize < 2**30 * 10 # 1 MB - 10 GB maximum size
# self.gdb = zlmdb.Database(dbpath=dbfile, maxsize=maxsize, readonly=False, sync=True, context=self)
[docs]
self.gdb = zlmdb.Database.open(dbpath=dbfile, maxsize=maxsize, readonly=False, sync=True, context=self)
self.gdb.__enter__()
[docs]
self.gschema: GlobalSchema = GlobalSchema.attach(self.gdb)
self.log.info(
"global database initialized [dbfile={dbfile}, maxsize={maxsize}]",
dbfile=hlval(dbfile),
maxsize=hlval(maxsize),
)
# mrealm database
#
dbcfg = config.extra.get("database", {})
assert dbcfg and isinstance(dbcfg, dict)
dbfile = dbcfg.get("dbfile", None)
assert dbfile and isinstance(dbfile, six.text_type)
maxsize = dbcfg.get("maxsize", None)
assert maxsize and type(maxsize) in six.integer_types
assert maxsize >= 2**20 and maxsize < 2**30 * 10 # 1 MB - 10 GB maximum size
# self.db = zlmdb.Database(dbpath=dbfile, maxsize=maxsize, readonly=False, sync=True, context=self)
[docs]
self.db = zlmdb.Database.open(dbpath=dbfile, maxsize=maxsize, readonly=False, sync=True, context=self)
self.db.__enter__()
[docs]
self.schema = MrealmSchema.attach(self.db)
self.log.info(
"management realm database initialized [dbfile={dbfile}, maxsize={maxsize}]",
dbfile=hlid(dbfile),
maxsize=hlval(maxsize),
)
[docs]
self._webcluster_manager = WebClusterManager(self, self.gdb, self.gschema, self.db, self.schema)
[docs]
self._routercluster_manager = RouterClusterManager(self, self.gdb, self.gschema, self.db, self.schema)
[docs]
self._arealm_manager = ApplicationRealmManager(self, self.gdb, self.gschema, self.db, self.schema)
@property
[docs]
def nodes(self):
"""
Returns handle to map of currently connected nodes. The map is indexed by ``node_id``.
:return: Map of node object ID to :class:`crossbar.master.mrealm.controller.Node`
:rtype: dict
"""
return self._nodes
[docs]
def node(self, node_authid):
"""
Get node by node authid (rather than oid).
:param node_authid:
:return:
"""
with self.gdb.begin() as txn:
node_oid = self.gschema.idx_nodes_by_authid[txn, (self._mrealm_oid, node_authid)]
# currently, nodes are indexed by str-type UUID in the run-time map
node_oid = str(node_oid)
return self._nodes.get(node_oid, None)
[docs]
def map_node_oid_to_authid(self, node_oid):
"""
Map node object ID to node authid. This referes to any node for which there is a run-time representation
currently active, regardless of whether this node is currently online or not.
:param node_oid:
:return:
"""
node_oid = uuid.UUID(node_oid)
with self.gdb.begin() as txn:
node = self.gschema.nodes[txn, node_oid]
if node:
self.log.debug(
'{func}: mapped node object ID {node_oid} to node authid "{node_authid}"',
func=hltype(self.map_node_oid_to_authid),
node_oid=hlid(node_oid),
node_authid=hlid(node.authid),
)
return node.authid
else:
self.log.warn(
"{func}: could not map node object ID {node_oid} to any node authid",
func=hltype(self.map_node_oid_to_authid),
node_oid=hlid(node_oid),
)
return None
@inlineCallbacks
[docs]
def onJoin(self, details):
# initialize this mrealm run-time representation
try:
yield self._initialize(details)
except:
# immediately close down if there is any error during mrealm initialization
self.log.failure()
self.leave()
else:
self.log.info(
'Management controller started for management realm "{realm}" {func}',
func=hltype(self.onJoin),
realm=hlid(details.realm),
)
# initialize Metadata manager
yield self._metadata_manager.start(prefix=self._uri_prefix + "metadata.")
# initialize Web cluster manager
yield self._webcluster_manager.start(prefix=self._uri_prefix + "webcluster.")
# initialize router cluster manager
yield self._routercluster_manager.start(prefix=self._uri_prefix + "routercluster.")
# initialize application realm manager
yield self._arealm_manager.start(prefix=self._uri_prefix + "arealm.")
self.log.info(
'Management realm controller ready for management realm {mrealm_oid}! (realm="{realm}", session={session}, authid="{authid}", authrole="{authrole}") [{func}]',
mrealm_oid=hlid(self._mrealm_oid),
realm=hlval(details.realm),
session=hlval(details.session),
authid=hlval(details.authid),
authrole=hlval(details.authrole),
func=hltype(self.onJoin),
)
[docs]
def onLeave(self, details):
# first, stop background "check nodes" task
if self._check_and_apply_loop:
if self._check_and_apply_loop.running:
self._check_and_apply_loop.stop()
self._check_and_apply_loop = None
# next, stop own heartbeat task
if self._tick_loop:
if self._tick_loop.running:
self._tick_loop.stop()
self._tick_loop = None
return ApplicationSession.onLeave(self, details)
@inlineCallbacks
[docs]
def _initialize(self, details):
# "self.config.extra" comes from "crossbar/master/node/node.py"
self.log.debug(
'{klass} starting realm "{realm}" with config:\n{config}',
klass=self.__class__.__name__,
realm=self._realm,
config=pprint.pformat(self.config.extra),
)
self._started = utcnow()
self._nodes = {}
self._nodes_shutdown = {}
self._node_oid_by_name = {}
self._sessions = {}
self._traces = {}
# subscribe to node lifecycle events
yield self.subscribe(
self._on_node_ready,
"crossbarfabriccenter.node..on_ready",
SubscribeOptions(match="wildcard", details=True),
)
# we subscribe to CF node heartbeat events, to track when we have last heard of a specific
# node, and also to react (after some time) should be neither receive an "on_shutdown" nor
# an "on_leave" event, so we can purge the node from our active list.
yield self.subscribe(
self._on_node_heartbeat, "crossbarfabriccenter.node.on_heartbeat", SubscribeOptions(details=True)
)
yield self.subscribe(
self._on_worker_heartbeat, "crossbarfabriccenter.node.on_worker_heartbeat", SubscribeOptions(details=True)
)
# when a CF node is gracefully shut down, we will receive this event. when the CF node
# is killed, see "session lifecycle events" below.
yield self.subscribe(
self._on_node_shutdown,
"crossbarfabriccenter.node..on_shutdown",
SubscribeOptions(match="wildcard", details=True),
)
# subscribe to session lifecycle events.
yield self.subscribe(self._on_session_startup, "wamp.session.on_join", SubscribeOptions(details=True))
# eg when a CF node is hard-killed, the management session will simply get lost, which
# is detected by CFC router, and a WAMP session leave meta event is published. however,
# no "on_shutdown" event is published! the CF node has been killed and had no chance to
# send out any management events. hence we must react to this event.
yield self.subscribe(self._on_session_shutdown, "wamp.session.on_leave", SubscribeOptions(details=True))
# produce CFCs own heartbeat on the management realm
self._tick = 1
tick_topic = "{}on_tick".format(self._uri_prefix)
@inlineCallbacks
def on_tick():
if self.is_attached():
ticked = {"now": utcnow(), "tick": self._tick}
yield self.publish(tick_topic, ticked, options=PublishOptions(acknowledge=True))
else:
self.log.warn(
'cannot send tick to CF node (topic="{tick_topic}", mrealm="{mrealm}""): management realm controller session no longer attached!',
tick_topic=tick_topic,
mrealm=self._realm,
)
self._tick += 1
self._tick_loop = LoopingCall(on_tick)
self._tick_loop.start(5)
self._check_and_apply_loop = LoopingCall(self.check_and_apply)
self._check_and_apply_loop.start(10)
# CFC public API
#
# register the public, user facing API of CFC. this is the API that
# user management components, our own CLI (cbsh) or CFC Web user interface
# will call into. since this is public API, special care should be taken
# with the design, keeping the API surface limited, logical and extensible
# management controller top-level API (on this object)
regs = yield self.register(self, prefix=self._uri_prefix, options=RegisterOptions(details_arg="details"))
procs = [reg.procedure for reg in regs]
self.log.debug(
"Mrealm controller {api} registered management procedures [{func}]:\n\n{procs}\n",
api=hl("Realm Home API", color="green", bold=True),
func=hltype(self._initialize),
procs=hl(pformat(procs), color="white", bold=True),
)
# different remoting APIs for node management
self._apis = APIS
# setup procedures of APIs
all_regs = []
for api in self._apis:
regs = api.register(self)
all_regs.extend(regs)
regs = yield DeferredList(all_regs)
procs = [reg.procedure for _, reg in regs]
self.log.debug(
"Mrealm controller {api} registered management procedures [{func}]:\n\n{procs}\n",
api=hl("Node Remoting API", color="magenta", bold=True),
func=hltype(self._initialize),
procs=hl(pformat(procs), color="white", bold=True),
)
# setup topics of APIs
all_subs = []
for api in self._apis:
subs = api.subscribe(self)
all_subs.extend(subs)
subs = yield DeferredList(all_subs)
topics = [sub.topic for _, sub in subs]
self.log.debug(
"Mrealm controller {api} subscribed management topics [{func}]:\n\n{topics}\n",
api=hl("Node Remoting API", color="magenta", bold=True),
func=hltype(self._initialize),
topics=hl(pformat(topics), color="white", bold=True),
)
# initialize tracing API
yield self._init_trace_api()
@inlineCallbacks
[docs]
def check_and_apply(self):
if self._check_and_apply_in_progress:
self.log.info(
"{func} {action} for mrealm {mrealm} skipped! check & apply already in progress.",
action=hl("check & apply run skipped", color="red", bold=True),
func=hltype(self.check_and_apply),
mrealm=hlid(self._mrealm_oid),
)
return
else:
self.log.info(
"{func} {action} for mrealm {mrealm} ..",
action=hl("check & apply run started", color="green", bold=True),
func=hltype(self.check_and_apply),
mrealm=hlid(self._mrealm_oid),
)
self._check_and_apply_in_progress = True
if not self.is_attached():
return
is_running_completely = True
cnt_nodes_online = 0
cnt_nodes_offline = 0
for node_id in self._nodes:
try:
node_status = yield self.call("crossbarfabriccenter.remote.node.get_status", node_id)
except Exception as e:
cnt_nodes_offline += 1
is_running_completely = False
if isinstance(e, ApplicationError) and e.error == "wamp.error.no_such_procedure":
if self._nodes[node_id].status == "offline":
# this is "expected" - we already knew that the node is offline, and hence the call is failing
# because of "no_such_procedure" is exactly what will happen as the node is offline
self.log.warn(
"{action} [status={status}] {func}",
action=hl(
'Warning, managed node "{}" still not connected or operational'.format(node_id),
color="red",
bold=False,
),
status=hlval(self._nodes[node_id].status),
func=hltype(self.check_and_apply),
)
else:
if self._nodes[node_id].status == "online":
self._nodes_shutdown[node_id] = time_ns()
# mark node as offline in run-time map
self._nodes[node_id].status = "offline"
# publish management event
yield self._publish_on_node_shutdown_yield(self._nodes[node_id])
self.log.info(
"{action} [oid={node_oid}, session={session_id}, status={status}] {func}",
action=hl(
'Warning: managed node "{}" became offline'.format(node_id), color="red", bold=True
),
node_oid=hlid(node_id),
session_id=hlid(None),
status=hlval(self._nodes[node_id].status),
func=hltype(self.check_and_apply),
)
else:
self.log.warn(
"{func}: unexpected run-time node status {status} for node {node_id}",
node_id=hlid(node_id),
func=hltype(self.check_and_apply),
status=hlval(self._nodes[node_id].status),
)
self._nodes[node_id].status = "offline"
else:
self._nodes[node_id].status = "offline"
self.log.warn(
"{action} [status={status}] {func}",
action=hl(
'Warning: check on managed node "{}" failed: {}'.format(node_id, e), color="red", bold=True
),
status=hlval(self._nodes[node_id].status),
func=hltype(self.check_and_apply),
)
else:
cnt_nodes_online += 1
self._nodes[node_id].heartbeat_workers = node_status["workers_by_type"]
self._nodes[node_id].last_activity = Node.LAST_ACTIVITY_CHECK
self._nodes[node_id].last_active = time_ns()
if self._nodes[node_id].status == "online":
self.log.info(
"{action} [status={status}] {func}",
action=hl('Ok, managed node "{}" is still healthy'.format(node_id), color="green", bold=False),
status=hlval(self._nodes[node_id].status),
func=hltype(self.check_and_apply),
)
else:
self.log.info(
'{action} [status={status} -> "{new_status}"] {func}',
action=hl(
'Ok, managed node "{}" became healthy (again)'.format(node_id), color="yellow", bold=True
),
status=hlval(self._nodes[node_id].status),
new_status=hlval("online"),
func=hltype(self.check_and_apply),
)
self._nodes[node_id].status = "online"
# publish "on_node_ready" management event
yield self._publish_on_node_ready_yield(self._nodes[node_id])
if is_running_completely:
color = "green"
action = "check & apply run completed successfully"
else:
color = "red"
action = "check & apply run finished with problems left"
self._check_and_apply_in_progress = False
self.log.info(
"{func} {action} for mrealm {mrealm}: {cnt_nodes_online} nodes online, {cnt_nodes_offline} nodes offline.",
action=hl(action, color=color, bold=True),
func=hltype(self.check_and_apply),
mrealm=hlid(self._mrealm_oid),
cnt_nodes_online=hlval(cnt_nodes_online),
cnt_nodes_offline=hlval(cnt_nodes_offline),
)
[docs]
async def _publish_on_node_ready(self, node):
options = PublishOptions(acknowledge=True)
uri = "{}on_node_ready".format(self._uri_prefix)
obj = node.marshal()
await self.publish(uri, node.node_id, obj, options=options)
await self.config.controller.publish(uri, node.node_id, obj, options=options)
self.log.debug(
".. forward published event to controller for managed node {node_id} (uri={uri}, session_id={session_id}, realm={realm}, authid={authid}, authrole={authrole})",
node_id=node.node_id,
uri=uri,
session_id=self.config.controller._session_id,
realm=self.config.controller._realm,
authid=self.config.controller._authid,
authrole=self.config.controller._authrole,
)
@inlineCallbacks
[docs]
def _publish_on_node_ready_yield(self, node):
# FIXME: this is a super hack: we need a twisted thing here in "on_check_nodes". Keep synced to "_publish_on_node_shutdown"!
options = PublishOptions(acknowledge=True)
uri = "{}on_node_ready".format(self._uri_prefix)
obj = node.marshal()
yield self.publish(uri, node.node_id, obj, options=options)
yield self.config.controller.publish(uri, node.node_id, obj, options=options)
self.log.debug(
".. forward published event to controller for managed node {node_id} (uri={uri}, session_id={session_id}, realm={realm}, authid={authid}, authrole={authrole})",
node_id=node.node_id,
uri=uri,
session_id=self.config.controller._session_id,
realm=self.config.controller._realm,
authid=self.config.controller._authid,
authrole=self.config.controller._authrole,
)
[docs]
async def _publish_on_node_shutdown(self, node):
options = PublishOptions(acknowledge=True)
uri = "{}on_node_shutdown".format(self._uri_prefix)
obj = node.marshal()
await self.publish(uri, node.node_id, obj, options=options)
await self.config.controller.publish(uri, node.node_id, obj, options=options)
self.log.debug(
".. forward published event to controller for managed node {node_id} (uri={uri}, session_id={session_id}, realm={realm}, authid={authid}, authrole={authrole})",
node_id=node.node_id,
uri=uri,
session_id=self.config.controller._session_id,
realm=self.config.controller._realm,
authid=self.config.controller._authid,
authrole=self.config.controller._authrole,
)
@inlineCallbacks
[docs]
def _publish_on_node_shutdown_yield(self, node):
# FIXME: this is a super hack: we need a twisted thing here in "on_check_nodes". Keep synced to "_publish_on_node_shutdown"!
options = PublishOptions(acknowledge=True)
uri = "{}on_node_shutdown".format(self._uri_prefix)
obj = node.marshal()
yield self.publish(uri, node.node_id, obj, options=options)
yield self.config.controller.publish(uri, node.node_id, obj, options=options)
self.log.debug(
".. forward published event to controller for managed node {node_id} (uri={uri}, session_id={session_id}, realm={realm}, authid={authid}, authrole={authrole})",
node_id=node.node_id,
uri=uri,
session_id=self.config.controller._session_id,
realm=self.config.controller._realm,
authid=self.config.controller._authid,
authrole=self.config.controller._authrole,
)
[docs]
async def _on_node_ready(self, ready_info=None, details: Optional[CallDetails] = None):
node_id = ready_info.get("node_id", None) if ready_info else None
self.log.info(
'Node "{node_id}" is ready: {ready_info} {details}',
node_id=node_id,
ready_info=ready_info,
details=details,
)
if node_id in self._nodes:
self.log.warn(
'Node-ready event for "{node_id}", but node not already in active node list found!', node_id=node_id
)
_node = Node(node_id)
_node.last_activity = Node.LAST_ACTIVITY_READY
_node.last_active = time_ns()
self._nodes[node_id] = _node
await self._publish_on_node_ready(_node)
self.log.debug("{func}: completed!", func=hltype(self._on_node_ready))
[docs]
async def _on_worker_heartbeat(self, node_authid, worker_id, heartbeat, details: Optional[CallDetails] = None):
"""
Receive heartbeats from workers run on CF nodes managed by this CFC instances. By default,
CF node workers will send one hearbeat every 10 seconds.
:param node_authid: The node ID (UUID in the master database).
:type node_authid: str
:param worker_id: The local worker ID on the remote CF node (_not_ the UUID of the worker
in the master database)
:type worker_id: str
:param heartbeat: Node heartbeat.
:type heartbeat: dict
:param details: Event details.
:type details: :class:`autobahn.wamp.types.EventDetails`
"""
assert isinstance(node_authid, str)
assert isinstance(worker_id, str)
assert isinstance(heartbeat, dict)
assert details is None or isinstance(details, EventDetails)
started = time_ns()
heartbeat_time = heartbeat.get("timestamp", None)
heartbeat_seq = heartbeat.get("seq", None)
self.log.debug(
'Heartbeat from worker "{worker_id}" on node "{node_authid}" [heartbeat_seq={heartbeat_seq}, time={heartbeat_time}, publisher={publisher}, authid={authid}]',
node_authid=hlid(node_authid),
worker_id=hlid(worker_id),
heartbeat_seq=hlid(heartbeat_seq),
heartbeat_time=hlid(heartbeat_time),
publisher=hlid(details.publisher) if details else None,
authid=hlid(details.publisher_authid) if details else None,
)
self.log.debug("Raw worker heartbeat: \n{heartbeat}", heartbeat=pprint.pformat(heartbeat))
with self.gdb.begin() as txn:
node_oid = self.gschema.idx_nodes_by_authid[txn, (self._mrealm_oid, node_authid)]
node = self.gschema.nodes[txn, node_oid]
node_oid = str(node_oid)
mrealm_id = node.mrealm_oid
mworker_log = MWorkerLog.parse(mrealm_id, uuid.UUID(node_oid), worker_id, heartbeat)
self.log.debug("Parsed worker heartbeat: \n{mworker_log}", mworker_log=pprint.pformat(mworker_log.marshal()))
with self.db.begin(write=True) as txn:
self.schema.mworker_logs[txn, (mworker_log.timestamp, mworker_log.node_id, mworker_log.worker_id)] = (
mworker_log
)
ended = time_ns()
runtime = int(round((ended - started) / 1000000.0))
# taking longer than 250ms means: sth is likely wrong ..
if runtime > 250:
self.log.warn(
'Worker heartbeat excessive processing time {runtime} ms! [node_oid="{node_oid}", worker_id="{worker_id}", timestamp="{timestamp}", sent="{sent}", seq={seq}]',
runtime=runtime,
timestamp=mworker_log.timestamp,
sent=mworker_log.sent,
seq=mworker_log.seq,
node_oid=node_oid,
worker_id=worker_id,
)
else:
self.log.debug(
'Worker heartbeat processed and stored in {runtime} ms [node_oid="{node_oid}", worker_id="{worker_id}", timestamp="{timestamp}", sent="{sent}", seq={seq}]',
runtime=runtime,
timestamp=mworker_log.timestamp,
sent=mworker_log.sent,
seq=mworker_log.seq,
node_oid=node_oid,
worker_id=worker_id,
)
self.log.debug("{func}: completed!", func=hltype(self._on_worker_heartbeat))
[docs]
async def _on_node_heartbeat(self, node_authid, heartbeat, details: Optional[CallDetails] = None):
"""
Receive heartbeats from CF nodes managed by this CFC instances. By default,
CF nodes will send one hearbeat every 10 seconds.
:param node_authid: The node WAMP auth ID.
:type node_authid: str
:param heartbeat: Node heartbeat.
:type heartbeat: dict
:param details: Event details.
:type details: :class:`autobahn.wamp.types.EventDetails`
"""
assert isinstance(node_authid, str)
assert isinstance(heartbeat, dict)
assert details is None or isinstance(details, EventDetails)
started = time_ns()
heartbeat_time = heartbeat.get("timestamp", None)
assert isinstance(heartbeat_time, int)
heartbeat_seq = heartbeat.get("seq", None)
assert isinstance(heartbeat_seq, int)
heartbeat_pubkey = heartbeat.get("pubkey", None)
assert heartbeat_pubkey is None or (isinstance(heartbeat_pubkey, str) and len(heartbeat_pubkey) == 64)
heartbeat_workers = heartbeat.get("workers", {})
assert isinstance(heartbeat_workers, dict)
for worker_type in heartbeat_workers:
# FIXME:
ALLOWED_WORKER_TYPES = [
"controller",
"router",
"container",
"guest",
"proxy",
"hostmonitor",
"xbrmm",
"xbr_marketmaker",
"marketplace",
]
assert worker_type in ALLOWED_WORKER_TYPES, 'invalid worker type "{}" (valid types: {})'.format(
worker_type, ALLOWED_WORKER_TYPES
)
assert isinstance(heartbeat_workers[worker_type], int)
self.log.debug(
'Heartbeat from node "{node_authid}" with workers {heartbeat_workers} [heartbeat_seq={heartbeat_seq}, time={heartbeat_time}, publisher={publisher}, authid={authid}]',
node_authid=hlid(node_authid),
heartbeat_workers=heartbeat_workers,
heartbeat_seq=hlid(heartbeat_seq),
heartbeat_time=hlid(heartbeat_time),
publisher=hlid(details.publisher) if details else None,
authid=hlid(details.publisher_authid) if details else None,
)
self.log.debug("Raw node heartbeat:\n{heartbeat}", heartbeat=pprint.pformat(heartbeat))
with self.gdb.begin() as txn:
node_oid = self.gschema.idx_nodes_by_authid[txn, (self._mrealm_oid, node_authid)]
node = self.gschema.nodes[txn, node_oid]
# currently, nodes are indexed by str-type UUID in the run-time map
node_oid = str(node_oid)
if node_oid not in self._nodes:
self.log.warn(
'Heartbeat received from "{node_oid}", but node not found in online list (node will now be marked as online)',
node_oid=node_oid,
)
node_shutdown = self._nodes_shutdown.get(node_oid, None)
if not node_shutdown or (time_ns() - node_shutdown) > (60 * 10**9):
# create run-time representation of node
_node = Node(node_oid, heartbeat_seq, heartbeat_time, heartbeat_workers)
_node.last_activity = Node.LAST_ACTIVITY_HEARTBEAT
_node.last_active = time_ns()
self._nodes[node_oid] = _node
# publish "on_node_ready" management event
await self._publish_on_node_ready(_node)
self.log.info(
"{action} [oid={node_oid}, session={session_id}, status={status}] {func}",
action=hl(
'Success: managed node "{}" is now online'.format(node_authid), color="green", bold=True
),
node_oid=hlid(node_oid),
session_id=hlid(details.publisher) if details else None,
status=hlval(self._nodes[node_oid].status),
func=hltype(self._on_node_heartbeat),
)
else:
self._nodes[node_oid].heartbeat_counter = heartbeat_seq
self._nodes[node_oid].heartbeat_time = heartbeat_time
self._nodes[node_oid].heartbeat_workers = heartbeat_workers
self._nodes[node_oid].last_activity = Node.LAST_ACTIVITY_HEARTBEAT
self._nodes[node_oid].last_active = time_ns()
if self._nodes[node_oid].status == "online":
self.log.debug(
"{action} [oid={node_oid}, session={session_id}, status={status}] {func}",
action=hl('Ok, managed node "{}" is still alive'.format(node_authid), color="green", bold=False),
node_oid=hlid(node_oid),
session_id=hlid(details.publisher) if details else None,
status=hlval(self._nodes[node_oid].status),
func=hltype(self._on_node_heartbeat),
)
else:
self.log.info(
"{action} [oid={node_oid}, session={session_id}, status={status}] {func}",
action=hl(
'Ok, managed node "{}" became alive (again) [status={} -> online]'.format(
node_authid, self._nodes[node_oid].status
),
color="yellow",
bold=True,
),
node_oid=hlid(node_oid),
session_id=hlid(details.publisher) if details else None,
status=hlval(self._nodes[node_oid].status),
func=hltype(self._on_node_heartbeat),
)
self._nodes[node_oid].status = "online"
# heartbeat['authid'] = details.publisher_authid
heartbeat["authid"] = node_authid
heartbeat["node_id"] = node_oid
heartbeat["session"] = details.publisher if details else None
mrealm_id = node.mrealm_oid
mnode_log = MNodeLog.parse(mrealm_id, uuid.UUID(node_oid), heartbeat)
self.log.debug("Parsed node heartbeat:\n{heartbeat}", heartbeat=pprint.pformat(mnode_log.marshal()))
# this is the pubkey under which an aggregate usage record (see below) will be stored
if node.pubkey == heartbeat_pubkey:
with self.db.begin(write=True) as txn:
self.schema.mnode_logs[txn, (mnode_log.timestamp, mnode_log.node_id)] = mnode_log
self.log.debug(
"{msg} [timestamp={timestamp}, node_id={node_id}]",
msg=hl(
'New node HEARTBEAT persisted in database -> checking for pubkey="{}"'.format(node.pubkey),
bold=True,
),
timestamp=hlid(mnode_log.timestamp),
node_id=hlid(mnode_log.node_id),
)
ended = time_ns()
runtime = int(round((ended - started) / 1000000.0))
# taking longer than 250ms means: sth is likely wrong ..
if runtime > 250:
self.log.warn(
'Node heartbeat excessive processing time {runtime} ms! [{node_oid}, timestamp={timestamp}, sent="{sent}", seq={seq}]',
runtime=runtime,
timestamp=mnode_log.timestamp,
sent=mnode_log.sent,
seq=mnode_log.seq,
node_oid=node_oid,
)
else:
self.log.debug(
'Node heartbeat processed and stored in {runtime} ms [node_id={node_oid}, timestamp={timestamp}, sent="{sent}", seq={seq}]',
runtime=runtime,
timestamp=mnode_log.timestamp,
sent=mnode_log.sent,
seq=mnode_log.seq,
node_oid=node_oid,
)
else:
self.log.warn("heartbeat pubkey does not match pubkey for node matching node_id!")
self.log.debug("{func}: completed!", func=hltype(self._on_node_heartbeat))
[docs]
async def _on_node_shutdown(self, shutdown_info, details: Optional[CallDetails] = None):
node_authid = shutdown_info.get("node_id", None)
self.log.info(
'node "{node_authid}" has shut down: {shutdown_info} {details}',
node_authid=node_authid,
shutdown_info=shutdown_info,
details=details,
)
with self.gdb.begin() as txn:
node_oid = self.gschema.idx_nodes_by_authid[txn, (self._mrealm_oid, node_authid)]
if not node_oid:
self.log.warn(
'{func}: unrecognised node shutdown for "{node_authid}" - could not find node for authid',
func=hltype(self._on_session_shutdown),
node_authid=hlid(node_authid),
)
return
node_oid = str(node_oid)
if node_oid not in self._nodes:
self.log.warn(
'{func}: unrecognised node shutdown for "{node_authid}" - node not in run-time map',
func=hltype(self._on_node_shutdown),
node_authid=hlid(node_authid),
)
return
if node_oid in self._nodes:
self._nodes_shutdown[node_oid] = time_ns()
# mark node as offline in run-time map
self._nodes[node_oid].status = "offline"
# publish management event
await self._publish_on_node_shutdown(self._nodes[node_oid])
self.log.debug("{func}: completed!", func=hltype(self._on_node_shutdown))
[docs]
async def _on_session_startup(self, session, details: Optional[CallDetails] = None):
if session.get("authrole") == "node":
session_id = session.get("session")
node_authid = session.get("authid")
self._sessions[session_id] = node_authid
with self.gdb.begin() as txn:
node_oid = self.gschema.idx_nodes_by_authid[txn, (self._mrealm_oid, node_authid)]
# currently, nodes are indexed by str-type UUID in the run-time map
node_oid = str(node_oid)
# create run-time representation of node
if node_oid not in self._nodes:
_node = Node(node_oid)
_node.last_activity = Node.LAST_ACTIVITY_STARTED
_node.last_active = time_ns()
_node.status = "online"
self._nodes[node_oid] = _node
# publish "on_node_ready" management event
await self._publish_on_node_ready(_node)
self.log.info(
"{action} [oid={node_oid}, session={session_id}, status={status}] {func}",
action=hl(
'Success: managed node "{}" is now online'.format(node_authid), color="green", bold=True
),
node_oid=hlid(node_oid),
session_id=hlid(session_id),
status=hlval(self._nodes[node_oid].status),
func=hltype(self._on_session_startup),
)
self.log.debug("{func}: completed!", func=hltype(self._on_session_startup))
[docs]
async def _on_session_shutdown(self, session_id, details: Optional[CallDetails] = None):
node_authid = self._sessions.get(session_id)
# we are only interested in session closes from management uplinks
if not node_authid:
self.log.debug(
'{func}: unrecognised session close for "{session_id}" - could not map session to authid',
func=hltype(self._on_session_shutdown),
session_id=hlid(session_id),
)
return
with self.gdb.begin() as txn:
node_oid = self.gschema.idx_nodes_by_authid[txn, (self._mrealm_oid, node_authid)]
if not node_oid:
self.log.warn(
'{func}: unrecognised session close for "{session_id}" - could not find node for authid',
func=hltype(self._on_session_shutdown),
session_id=hlid(session_id),
)
return
node_oid = str(node_oid)
if node_oid not in self._nodes:
self.log.warn(
'{func}: unrecognised session close for "{session_id}" - node not in run-time map',
func=hltype(self._on_session_shutdown),
session_id=hlid(session_id),
)
return
self._nodes_shutdown[node_oid] = time_ns()
# mark node as offline in run-time map
self._nodes[node_oid].status = "offline"
# publish management event
await self._publish_on_node_shutdown(self._nodes[node_oid])
self.log.info(
"{action} [oid={node_oid}, session={session_id}, status={status}] {func}",
action=hl('Warning: managed node "{}" became offline'.format(node_authid), color="red", bold=True),
node_oid=hlid(node_oid),
session_id=hlid(session_id),
status=hlval(self._nodes[node_oid].status),
func=hltype(self._on_session_shutdown),
)
[docs]
def _check_node_id(self, node_id, status="online"):
if node_id not in self._nodes:
raise Exception('no such node: "{node_id}"', node_id=node_id)
node = self._nodes[node_id]
if status is not None and node.status != status:
raise Exception('node "{}" not in status "{}"'.format(node_id, status))
return node
[docs]
def _check_worker_id(self, node_id, worker_id, status="online"):
self._check_node_id(node_id, status)
@wamp.register(None, check_types=True)
[docs]
def get_status(self, details: Optional[CallDetails] = None) -> dict:
"""
Get management realm status.
:returns: Status information object.
"""
now = utcnow()
uptime_secs = (iso8601.parse_date(now) - iso8601.parse_date(self._started)).total_seconds()
uptime_secs_str = humanize.naturaldelta(uptime_secs)
res = {
"type": "management",
"realm": self._realm,
"now": utcnow(),
"started": self._started,
"uptime": uptime_secs_str,
"tick": self._tick,
}
return res
@wamp.register(None, check_types=True)
[docs]
def get_nodes(
self, status: Optional[str] = None, return_names: Optional[bool] = None, details: Optional[CallDetails] = None
) -> List[str]:
"""
Returns list of nodes.
:param status: Filter nodes for this status (``"online"``, ``"offline"``).
:param return_names: Return node names (``authid``) instead of object IDs.
:returns: List of node IDs or node names.
"""
self.log.info(
"{func}(status={status}, details.caller_authid={caller_authid})",
status=hlval(status),
func=hltype(self.get_nodes),
caller_authid=hlval(details.caller_authid if details else None),
)
with self.gdb.begin() as txn:
from_key = (self._mrealm_oid, "")
to_key = (uuid.UUID(int=(int(self._mrealm_oid) + 1)), "")
node_oids = list(
self.gschema.idx_nodes_by_authid.select(txn, from_key=from_key, to_key=to_key, return_keys=False)
)
if status:
if status == "offline":
res = [node_oid for node_oid in node_oids if str(node_oid) not in self._nodes]
elif status == "online":
res = [
node_oid
for node_oid in node_oids
if (str(node_oid) in self._nodes and self._nodes[str(node_oid)].status == status)
]
else:
raise Exception("logic error")
else:
res = node_oids
if return_names:
res_ = []
with self.gdb.begin() as txn:
for node_oid in res:
node = self.gschema.nodes[txn, node_oid]
if node and node.authid:
res_.append(node.authid)
else:
res_ = [str(node_oid) for node_oid in res]
return res_
@wamp.register(None, check_types=True)
[docs]
def get_node(self, node_oid: str, details: Optional[CallDetails] = None) -> dict:
"""
Return information about node. The procedure will raise an `crossbar.error.no_such_object` error
when no node with the given authid can be found.
:param node_oid: The object ID of the node to get information for, eg `"5616c7cc-31b5-4021-8cd9-b7769d3f0dd3"`.
:returns: Node information object.
"""
try:
_node_oid = uuid.UUID(node_oid)
except Exception as e:
raise ApplicationError("wamp.error.invalid_argument", "invalid node_oid: {}".format(str(e)))
self.log.info('{func}(node_oid="{node_oid}")', node_oid=hlid(_node_oid), func=hltype(self.get_node))
with self.gdb.begin() as txn:
node = self.gschema.nodes[txn, _node_oid]
if not node:
raise ApplicationError(
"crossbar.error.no_such_object", "no node with object ID {} in management realm".format(node_oid)
)
node_obj = node.marshal()
if node_oid in self._nodes:
node = self._nodes[node_oid]
node_obj["heartbeat"] = node.heartbeat_counter
node_obj["timestamp"] = node.heartbeat_time
node_obj["status"] = node.status
else:
node_obj["heartbeat"] = None
node_obj["timestamp"] = None
node_obj["status"] = "offline"
return node_obj
@wamp.register(None, check_types=True)
[docs]
def get_node_by_authid(self, node_authid: str, details: Optional[CallDetails] = None) -> dict:
"""
Return node information by node (auth)id. The procedure will raise an `crossbar.error.no_such_object` error
when no node with the given authid can be found.
:param node_authid: The WAMP authid the node is authenticated under.
:returns: Node information object.
"""
self.log.info(
'{func}(node_authid="{node_authid}")', node_authid=hlid(node_authid), func=hltype(self.get_node_by_authid)
)
with self.gdb.begin() as txn:
node_oid = self.gschema.idx_nodes_by_authid[txn, (self._mrealm_oid, node_authid)]
if not node_oid:
raise ApplicationError(
"crossbar.error.no_such_object", "no node with authid {} in management realm".format(node_authid)
)
node_obj = self.get_node(str(node_oid), details)
return node_obj
@inlineCallbacks
[docs]
def _init_trace_api(self):
@inlineCallbacks
def on_trace_data(node_id, worker_id, trace_id, period, trace_data, details: Optional[CallDetails] = None):
self.log.debug(
'Trace "{trace_id}" on node "{node_id}" / worker "{worker_id}":\n\nperiod = {period}\n\ntrace_data = {trace_data}\n\n',
node_id=node_id,
worker_id=worker_id,
trace_id=trace_id,
period=pprint.pformat(period),
trace_data=pprint.pformat(trace_data),
)
trace = self._traces.get(trace_id, None)
if trace:
# beware, we are searching for a list, not a tuple here
if [node_id, worker_id] in self._traces[trace_id].traced_workers:
publish_options = PublishOptions(
eligible_authrole=trace.eligible_reader_roles,
exclude_authrole=trace.exclude_reader_roles,
acknowledge=True,
)
yield self.publish(
"crossbarfabriccenter.mrealm.tracing.on_trace_data",
node_id,
worker_id,
trace_id,
period,
trace_data,
options=publish_options,
)
yield self.subscribe(
on_trace_data, "crossbarfabriccenter.remote.tracing.on_trace_data", SubscribeOptions(details=True)
)
self.log.debug("central tracing API initialized")
@inlineCallbacks
@wamp.register(None, check_types=True)
[docs]
def get_trace_data(
self, trace_id: str, limit: Optional[int] = None, details: Optional[CallDetails] = None
) -> Deferred:
self.log.info('get_trace_data(trace_id="{trace_id}", limit="{limit}")', trace_id=trace_id, limit=limit)
trace = self._traces.get(trace_id, None)
if trace:
if trace.eligible_reader_roles and details:
if details.caller_authrole not in trace.eligible_reader_roles:
raise ApplicationError("crossbar.error.no_such_object", "No trace with ID '{}'".format(trace_id))
if trace.exclude_reader_roles and details:
if details.caller_authrole in trace.exclude_reader_roles:
raise ApplicationError("crossbar.error.no_such_object", "No trace with ID '{}'".format(trace_id))
else:
raise ApplicationError("crossbar.error.no_such_object", "No trace with ID '{}'".format(trace_id))
dl = []
for node_id, worker_id in trace.traced_workers:
node = self._nodes.get(node_id, None)
if node and node.status == "online":
d = self.call(
"crossbarfabriccenter.remote.tracing.get_trace_data", node_id, worker_id, trace_id, 0, limit=limit
)
dl.append(d)
else:
dl.append(defer.fail("node not online"))
trace_data_results = yield DeferredList(dl)
result: Dict[str, Any] = {}
for (node_id, worker_id), (success, data) in six.moves.zip(trace.traced_workers, trace_data_results):
if node_id not in result:
result[node_id] = {}
result[node_id][worker_id] = {"success": success, "data": data}
returnValue(result)
@wamp.register(None, check_types=True)
[docs]
def get_trace(self, trace_id: str, details: Optional[CallDetails] = None) -> Optional[dict]:
"""
Get detail information about a previously created trace. When the trace
doesn't exist, `None` is returned.
Note: The trace information is only returned when the caller has
read-access (at least), otherwise `None` is returned (silently).
:param trace_id: The ID of the trace to retrieve information for.
:returns: A trace information object.
"""
trace = self._traces.get(trace_id, None)
if trace:
if trace.eligible_reader_roles and details:
if details.caller_authrole not in trace.eligible_reader_roles:
self.log.debug(
'get_trace({trace_id}) -> trace found, but not authorized (role "{caller_authrole}" is not eligible)!',
trace_id=trace_id,
caller_authrole=details.caller_authrole if details else None,
)
return None
if trace.exclude_reader_roles and details:
if details.caller_authrole in trace.exclude_reader_roles:
self.log.debug(
'get_trace({trace_id}) -> trace found, but not authorized (role "{caller_authrole}" is excluded)!',
trace_id=trace_id,
caller_authrole=details.caller_authrole if details else None,
)
return None
return trace.marshal()
else:
self.log.debug("get_trace({trace_id}) -> no such trace", trace_id=trace_id)
return None
@wamp.register(None, check_types=True)
[docs]
def get_traces(self, details: Optional[CallDetails] = None) -> List[str]:
"""
Get IDs of trace defined.
Note: Only IDs of traces to which the caller has read-access (at least) are returned.
:returns: List of trace IDs.
"""
trace_ids = []
for trace in self._traces.values():
if trace.eligible_reader_roles and details:
if details.caller_authrole not in trace.eligible_reader_roles:
self.log.info(
'get_traces() -> trace "{trace_id}" found, but not authorized (role "{caller_authrole}" is not eligible)!',
trace_id=trace.trace_id,
caller_authrole=details.caller_authrole,
)
continue
if trace.exclude_reader_roles and details:
if details.caller_authrole in trace.exclude_reader_roles:
self.log.info(
'get_traces() -> trace "{trace_id}" found, but not authorized (role "{caller_authrole}" is excluded)!',
trace_id=trace.trace_id,
caller_authrole=details.caller_authrole,
)
continue
trace_ids.append(trace.trace_id)
return sorted(trace_ids)
@wamp.register(None, check_types=True)
[docs]
def create_trace(
self,
trace_id: str,
traced_workers: List[Tuple[str, str]],
trace_options: Optional[Dict] = None,
eligible_reader_roles: Optional[List[str]] = None,
exclude_reader_roles: Optional[List[str]] = None,
details: Optional[CallDetails] = None,
) -> dict:
"""
Create a new trace.
:param trace_id: The ID of the trace to create (must be unique within the management realm).
:param traced_workers: A list of pairs `(node_id, worker_id)` with node and (router) worker IDs
on which the trace is to be run.
:param trace_options: Tracing options for the trace.
:param eligible_reader_roles: If given, allow read access to the trace only for callers
authenticated under a WAMP authrole FROM this list - otherwise allow any role (=public)!
:param exclude_reader_roles: If given, allow read access to the trace only for callers
authenticated under a WAMP authrole NOT FROM this list - otherwise allow any role (=public)!
:returns: Trace started information.
"""
if trace_id in self._traces:
raise Exception(
'trace with ID "{}" already exists (status "{}")'.format(trace_id, self._traces[trace_id].status)
)
status = "stopped"
trace = Trace(trace_id, traced_workers, trace_options, eligible_reader_roles, exclude_reader_roles, status)
self._traces[trace_id] = trace
trace_created: Dict[str, Any] = {
# FIXME
}
publish_options = PublishOptions(
eligible_authrole=trace.eligible_reader_roles, exclude_authrole=trace.exclude_reader_roles
)
self.publish(
"{}tracing.on_trace_created".format(self._uri_prefix), trace_id, trace_created, options=publish_options
)
return trace_created
@inlineCallbacks
@wamp.register(None, check_types=True)
[docs]
def start_trace(self, trace_id: str, details: Optional[CallDetails] = None) -> Deferred:
"""
Start a previously created trace.
:param trace_id: The ID of the trace to start.
:type trace_id: str
:returns: dict: Trace started information.
"""
trace = self._traces.get(trace_id, None)
if not trace:
raise Exception('no trace with ID "{}" exists'.format(trace_id))
if trace.status != "stopped":
raise Exception('cannot start trace with ID "{}" currently in status "{}"'.format(trace_id, trace.status))
trace.status = "starting"
publish_options = PublishOptions(
eligible_authrole=trace.eligible_reader_roles, exclude_authrole=trace.exclude_reader_roles
)
self.publish("{}tracing.on_trace_starting".format(self._uri_prefix), trace_id, options=publish_options)
traces_started = []
traces_failed = []
for node_id, worker_id in trace.traced_workers:
node = self._nodes.get(node_id, None)
if node and node.status == "online":
try:
# stop any currently online traces (we don't want to get data from orphaned traces)
traces = yield self.call("crossbarfabriccenter.remote.tracing.get_traces", node_id, worker_id)
if trace_id in traces:
_trace = yield self.call(
"crossbarfabriccenter.remote.tracing.get_trace", node_id, worker_id, trace_id
)
if _trace["status"] == "running":
trace_stopped = yield self.call(
"crossbarfabriccenter.remote.tracing.stop_trace", node_id, worker_id, trace_id
)
self.log.info(
'Trace "{trace_id}" on node "{node_id}" / worker "{worker_id}" stopped:\n{trace_stopped}',
trace_stopped=trace_stopped,
trace_id=trace_id,
node_id=node_id,
worker_id=worker_id,
)
# start fresh trace
trace_started = yield self.call(
"crossbarfabriccenter.remote.tracing.start_trace",
node_id,
worker_id,
trace_id,
trace_options=trace.trace_options,
)
self.log.info(
'Trace "{trace_id} on node "{node_id}" / worker "{worker_id}" started with options {trace_options}:\n{trace_started}"',
node_id=node_id,
worker_id=worker_id,
trace_id=trace_id,
trace_options=trace.trace_options,
trace_started=trace_started,
)
traces_started.append(trace_started)
except Exception as e:
self.log.failure()
traces_failed.append(
{"node_id": node_id, "node_status": node.status, "worker_id": worker_id, "error": str(e)}
)
else:
node_status = node.status if node else "offline"
self.log.warn(
'trace "{trace_id}": skipping to start trace on node "{node_id}" in status "{status}"',
node_id=node_id,
trace_id=trace_id,
status=node_status,
)
traces_failed.append({"node_id": node_id, "node_status": node_status, "worker_id": worker_id})
trace.status = "running"
trace_started = {
"started": traces_started,
"failed": traces_failed,
}
self.publish(
"{}tracing.on_trace_started".format(self._uri_prefix), trace_id, trace_started, options=publish_options
)
returnValue(trace_started)
@inlineCallbacks
@wamp.register(None, check_types=True)
[docs]
def stop_trace(self, trace_id: str, details: Optional[CallDetails] = None) -> Deferred:
"""
Stop a running trace.
:param trace_id: The ID of the trace to stop.
:returns: Trace stopped information.
"""
trace = self._traces.get(trace_id, None)
if not trace:
raise Exception('no trace with ID "{}" exists'.format(trace_id))
if trace.status not in ["running", "stopping_failed"]:
raise Exception('cannot stop trace with ID "{}" currently in status "{}"'.format(trace_id, trace.status))
trace.status = "stopping"
publish_options = PublishOptions(
eligible_authrole=trace.eligible_reader_roles, exclude_authrole=trace.exclude_reader_roles
)
self.publish("{}tracing.on_trace_stopping".format(self._uri_prefix), trace_id, options=publish_options)
traces_stopped = []
traces_failed = []
for node_id, worker_id in trace.traced_workers:
node = self._nodes.get(node_id, None)
if node and node.status == "online":
try:
traces = yield self.call("crossbarfabriccenter.remote.tracing.get_traces", node_id, worker_id)
if trace_id in traces:
_trace = yield self.call(
"crossbarfabriccenter.remote.tracing.get_trace", node_id, worker_id, trace_id
)
if _trace["status"] == "running":
trace_stopped = yield self.call(
"crossbarfabriccenter.remote.tracing.stop_trace", node_id, worker_id, trace_id
)
traces_stopped.append(trace_stopped)
self.log.info(
'Trace "{trace_id}" on node "{node_id}" / worker "{worker_id}" stopped:\n{trace_stopped}',
trace_stopped=trace_stopped,
trace_id=trace_id,
node_id=node_id,
worker_id=worker_id,
)
except Exception as e:
self.log.failure()
traces_failed.append(
{"node_id": node_id, "node_status": node.status, "worker_id": worker_id, "error": str(e)}
)
else:
node_status = node.status if node else "offline"
self.log.warn(
'trace "{trace_id}": skipping to stop trace on node "{node_id}" in status "{status}"',
node_id=node_id,
trace_id=trace_id,
status=node_status,
)
traces_failed.append({"node_id": node_id, "node_status": node_status, "worker_id": worker_id})
if len(traces_failed):
trace.status = "stopping_failed"
else:
trace.status = "stopped"
trace_stopped = {
"stopped": traces_stopped,
"failed": traces_failed,
}
self.publish(
"{}tracing.on_trace_stopped".format(self._uri_prefix), trace_id, trace_stopped, options=publish_options
)
returnValue(trace_stopped)
@wamp.register(None, check_types=True)
[docs]
def delete_trace(self, trace_id: str, details: Optional[CallDetails] = None) -> dict:
"""
Delete a previously created (and currently stopped) trace.
:param trace_id: The ID of the trace to delete.
:returns: Trace deletion information.
"""
trace = self._traces.get(trace_id, None)
if not trace:
raise Exception('no trace with ID "{}" exists'.format(trace_id))
if trace.status not in ["stopped", "stopping_failed"]:
raise Exception('cannot delete trace with ID "{}" currently in status "{}"'.format(trace_id, trace.status))
trace_deleted: Dict[str, Any] = {
# FIXME
}
publish_options = PublishOptions(
eligible_authrole=trace.eligible_reader_roles, exclude_authrole=trace.exclude_reader_roles
)
self.publish(
"{}tracing.on_trace_deleted".format(self._uri_prefix), trace_id, trace_deleted, options=publish_options
)
del self._traces[trace_id]
return trace_deleted