Source code for crossbar.edge.node.management

##############################################################################
#
#                        Crossbar.io
#     Copyright (C) typedef int GmbH. All rights reserved.
#
##############################################################################

import pprint

import numpy as np
import txaio
from autobahn.twisted.wamp import ApplicationSession
from autobahn.wamp.exception import ApplicationError, TransportLost
from autobahn.wamp.types import PublishOptions, SubscribeOptions
from twisted.internet.defer import inlineCallbacks, succeed
from twisted.internet.task import LoopingCall
from txaio import make_logger, time_ns

from crossbar._util import hl, hlid, hltype

__all__ = ("NodeManagementSession", "NodeManagementBridgeSession")


[docs] class NodeManagementSession(ApplicationSession): """ This session is used for the uplink connection to Crossbar.io Master. """
[docs] log = make_logger()
[docs] def onUserError(self, fail, msg): """ Implements :func:`autobahn.wamp.interfaces.ISession.onUserError` """ if isinstance(fail.value, ApplicationError): self.log.debug( '{klass}.onUserError(): "{msg}"', klass=self.__class__.__name__, msg=fail.value.error_message() ) else: self.log.error( '{klass}.onUserError(): "{msg}"\n{traceback}', klass=self.__class__.__name__, msg=msg, traceback=txaio.failure_format_traceback(fail), )
def __init__(self, runner, config=None): ApplicationSession.__init__(self, config)
[docs] self._runner = runner
[docs] def onConnect(self): self.log.debug("{klass}.onConnect()", klass=self.__class__.__name__) # we're connected, now authenticate using wamp-cryptosign .. extra = { # forward the client pubkey: this allows us to omit authid as # the router can identify us with the pubkey already "pubkey": self.config.extra["node"].secmod[1].public_key(binary=False), # not yet implemented. a public key the router should provide # a trustchain for it's public key. the trustroot can eg be # hard-coded in the client, or come from a command line option. "trustroot": None, # not yet implemented. for authenticating the router, this # challenge will need to be signed by the router and send back # in AUTHENTICATE for client to verify. A string with a hex # encoded 32 bytes random value. "challenge": None, # https://tools.ietf.org/html/rfc5929 "channel_binding": "tls-unique", } # now request to join. the authrole==node is mandatory. the actual realm # we're joined to is decided by Crossbar.io Master, and hence we # must not provide that. Same holds for authid (also auto-assigned). # the authid assigned will (also) be used as the node_id self.join(realm=None, authrole="node", authmethods=["cryptosign"], authextra=extra)
[docs] def onChallenge(self, challenge): self.log.debug( "{klass}.onChallenge(challenge={challenge})", klass=self.__class__.__name__, challenge=challenge ) if challenge.method == "cryptosign": # alright, we've got a challenge from the router. # not yet implemented. check the trustchain the router provided against # our trustroot, and check the signature provided by the # router for our previous challenge. if both are ok, everything # is fine - the router is authentic wrt our trustroot. # sign the challenge with our private key. channel_id_type = self.config.extra.get("channel_binding", None) channel_id = self.transport.transport_details.channel_id.get(channel_id_type, None) signed_challenge = ( self.config.extra["node"] .secmod[1] .sign_challenge(challenge, channel_id=channel_id, channel_id_type=channel_id_type) ) # send back the signed challenge for verification return signed_challenge else: raise Exception( "internal error: we asked to authenticate using wamp-cryptosign, but now received a challenge for {}".format( challenge.method ) )
[docs] def onJoin(self, details): self.log.info("{func}(details={details})", func=hltype(self.onJoin), details=details) # be paranoid .. sanity checks if self.config.extra and "on_ready" in self.config.extra: if not self.config.extra["on_ready"].called: self.config.extra["on_ready"].callback( ( self, details.realm, # the management realm we've got auto-assigned to details.session, # WAMP session ID details.authid, # the authid (==node_id) we've got auto-assigned details.authextra, ) ) else: raise Exception("internal error: on_ready callback already called when we expected it was not") else: raise Exception("internal error: no on_ready callback provided")
[docs] def onLeave(self, details): self.log.debug("{klass}.onLeave(details={details})", klass=self.__class__.__name__, details=details) if details.reason in ["fabric.auth-failed.node-unpaired", "fabric.auth-failed.node-already-connected"]: # no reason to auto-reconnect: user needs to get active and pair the node first. self._runner.stop() if self.config.extra and "on_ready" in self.config.extra: if not self.config.extra["on_ready"].called: self.config.extra["on_ready"].errback(ApplicationError(details.reason, details.message)) if self.config.extra and "on_exit" in self.config.extra: if not self.config.extra["on_exit"].called: self.config.extra["on_exit"].callback(details.reason) else: raise Exception("internal error: on_exit callback already called when we expected it was not") else: raise Exception("internal error: no on_exit callback provided") self.disconnect()
[docs] def onDisconnect(self): self.log.debug("{klass}.onDisconnect()", klass=self.__class__.__name__) node = self.config.extra["node"] # FIXME: the node shutdown behavior should be more sophisticated than this! shutdown_on_cfc_lost = False if shutdown_on_cfc_lost: if node._controller: node._controller.shutdown()
[docs] class NodeManagementBridgeSession(ApplicationSession): """ The management bridge is a WAMP session that lives on the local management router, but has access to a 2nd WAMP session that lives on the uplink CFC router. The bridge is responsible for forwarding calls from CFC into the local node, and for forwarding events from the local node to CFC. """
[docs] log = make_logger()
def __init__(self, config): ApplicationSession.__init__(self, config)
[docs] self._manager = None
[docs] self._management_realm = None
[docs] self._node_id = None
[docs] self._regs = {}
[docs] self._authrole = "trusted"
[docs] self._authmethod = "trusted"
[docs] self._sub_on_mgmt = None
[docs] self._sub_on_reg_create = None
[docs] self._sub_on_reg_delete = None
[docs] self._heartbeat_time_ns = None
[docs] self._heartbeat = 0
[docs] self._heartbeat_call = None
[docs] def onJoin(self, details): self.log.debug("{klass}.onJoin(details={details})", klass=self.__class__.__name__, details=details)
@inlineCallbacks
[docs] def attach_manager(self, manager, management_realm, node_id): """ Attach management uplink session when the latter has been fully established and is ready to be used. :param manager: uplink session. :type manager: instance of `autobahn.wamp.protocol.ApplicationSession` :param management_realm: The management realm that was assigned by CFC to this node. :type management_realm: unicode :param node_id: The node ID that was assigned by CFC to this node. :type node_id: unicode """ if self._manager: raise Exception( "{}.attach_manager: interal error: manager already attached!".format(self.__class__.__name__) ) self._manager = manager self._management_realm = management_realm self._node_id = node_id self._controller_config = self.config.extra["controller_config"] fabric = self._controller_config.get("fabric", {}) heartbeat = fabric.get("heartbeat", {}) self._heartbeat_startup_delay = heartbeat.get("startup_delay", 5) self._heartbeat_heartbeat_period = heartbeat.get("heartbeat_period", 10) self._heartbeat_include_system_stats = heartbeat.get("include_system_stats", True) self._heartbeat_send_workers_heartbeats = heartbeat.get("send_workers_heartbeats", True) self._heartbeat_aggregate_workers_heartbeats = heartbeat.get("aggregate_workers_heartbeats", True) yield self._start_call_forwarding() yield self._start_event_forwarding() # start heartbeating a bit later (we currently run into workers not fully being up otherwise) from twisted.internet import reactor reactor.callLater(self._heartbeat_startup_delay, self._start_cfc_heartbeat) self.log.info( '{klass}.attach_manager: manager attached as node "{node_id}" on management realm "{management_realm}")', klass=self.__class__.__name__, node_id=self._node_id, management_realm=self._management_realm, ) self.log.info("Controller configuration: {controller_config}", controller_config=self._controller_config)
@inlineCallbacks
[docs] def detach_manager(self): """ Detach management uplink session (eg when that session has been lost). """ if not self._manager: self.log.debug( "{klass}.detach_manager: no manager manager currently attached", klass=self.__class__.__name__ ) return try: self._stop_cfc_heartbeat() except: # noqa self.log.failure() try: yield self._stop_event_forwarding() except: # noqa self.log.failure() try: yield self._stop_call_forwarding() except: # noqa self.log.failure() self._manager = None self._management_realm = None self._node_id = None self.log.info( '{klass}.detach_manager: manager detached for node "{node_id}" on management realm "{management_realm}")', klass=self.__class__.__name__, node_id=self._node_id, management_realm=self._management_realm, )
[docs] def _translate_uri(self, uri): """ Translate a local URI (one that is used on the local node management router) to a remote URI (one used on the uplink management session at the CFC router for the management realm). Example: crossbar.worker.worker-001.start_manhole -> crossbarfabriccenter.node.<node_id>.worker.<worker_id>.start_manhole """ # the local URI prefix under which the management API is registered _PREFIX = "crossbar." # the remote (==CFC) URI prefix under which the management API is registered _TARGET_PREFIX = "crossbarfabriccenter.node" if uri.startswith(_PREFIX): suffix = uri[len(_PREFIX) :] mapped_uri = ".".join([_TARGET_PREFIX, self._node_id, suffix]) self.log.debug( "mapped URI {uri} to {mapped_uri} [suffix={suffix}]", uri=uri, mapped_uri=mapped_uri, suffix=suffix ) return mapped_uri raise Exception("don't know how to translate URI {}".format(uri))
@inlineCallbacks
[docs] def _send_heartbeat(self): def _drop_attr(status): for k in ["ts", "timestamp", "user", "name", "cmdline", "created"]: if k in status: del status[k] if self._manager and self._manager.is_attached(): # get basic status status = yield self.call("crossbar.get_status") obj = { "timestamp": self._heartbeat_time_ns, "period": self._heartbeat_heartbeat_period, "mrealm_id": self._management_realm, "seq": self._heartbeat, "workers": status.get("workers_by_type", {}), } controller_status = None try: controller_status = yield self.call("crossbar.get_process_monitor") except ApplicationError as e: if e.error == "wamp.error.no_such_procedure": self.log.info( "Failed to retrieve controller process statistics for period {period} - controller procedure unavailable", period=self._heartbeat, ) else: raise e else: _drop_attr(controller_status) controller_status["timestamp"] = self._heartbeat_time_ns controller_status["period"] = self._heartbeat_heartbeat_period controller_status["mrealm_id"] = self._management_realm controller_status["seq"] = self._heartbeat controller_status["type"] = "controller" # MWorkerState: "online and fully operational" == 1 controller_status["state"] = 1 obj["workers"]["controller"] = 1 if self._heartbeat_include_system_stats: obj["system"] = yield self.call("crossbar.get_system_stats") # MNodeState: "online and fully operational" == 1 obj["state"] = 1 try: if self._manager: yield self._manager.publish( "crossbarfabriccenter.node.on_heartbeat", self._node_id, obj, options=PublishOptions(acknowledge=True), ) self.log.debug( 'Node heartbeat sent [node_id="{node_id}", timestamp="{timestamp}", seq={seq}]', timestamp=np.datetime64(obj["timestamp"], "ns"), seq=obj["seq"], node_id=self._node_id, obj=obj, ) self.log.debug("{heartbeat}", heartbeat=pprint.pformat(obj)) else: self.log.warn( "Skipped sending management link node heartbeat for period {period} - no management uplink", period=self._heartbeat, ) except: # noqa self.log.warn( "Failed to send management link node heartbeat for period {period}:", period=self._heartbeat ) self.log.failure() if self._heartbeat_send_workers_heartbeats: workers = {} native_worker_types = ["router", "container", "proxy", "xbrmm", "hostmonitor"] worker_ids = yield self.call("crossbar.get_workers", filter_types=native_worker_types) for worker_id in worker_ids: worker_status = {} try: worker_status["process"] = yield self.call( "crossbar.worker.{}.get_process_monitor".format(worker_id) ) self.log.debug("Native worker status: {status}", status=worker_status) except ApplicationError as e: if e.error == "wamp.error.no_such_procedure": self.log.warn( 'Failed to retrieve worker process statistics for worker "{worker_id}" in period {period} ("worker procedure unavailable")', worker_id=worker_id, period=self._heartbeat, ) else: raise e else: _drop_attr(worker_status) worker_status["timestamp"] = self._heartbeat_time_ns worker_status["period"] = self._heartbeat_heartbeat_period worker_status["mrealm_id"] = self._management_realm worker_status["seq"] = self._heartbeat worker_status["type"] = worker_status["process"]["type"] # MWorkerState: "online and fully operational" == 1 worker_status["state"] = 1 del worker_status["process"]["type"] workers[worker_id] = worker_status workers["controller"] = controller_status for worker_id, worker_status in workers.items(): # if worker is of type "router", expand with router statistics if worker_status["type"] == "router": try: router_stats = yield self.call( "crossbar.worker.{}.get_router_realm_stats".format(worker_id) ) self.log.debug("Router worker status: {router_stats}", router_stats=router_stats) except ApplicationError as e: if e.error == "wamp.error.no_such_procedure": self.log.warn( "Failed to retrieve router statistics for period {period} - worker procedure unavailable", period=self._heartbeat, ) else: raise e else: if self._heartbeat_aggregate_workers_heartbeats: mstats = {} stats_sessions = 0 stats_roles = 0 for realm in router_stats: stats_sessions += router_stats[realm]["sessions"] stats_roles += router_stats[realm]["roles"] for direction in router_stats[realm]["messages"]: for k in router_stats[realm]["messages"][direction]: if k not in mstats: mstats[k] = 0 mstats[k] += router_stats[realm]["messages"][direction][k] worker_status["router"] = { "messages": mstats, "sessions": stats_sessions, "roles": stats_roles, } else: worker_status["router"] = router_stats # now publish the node heartbeat management event try: if self._manager: yield self._manager.publish( "crossbarfabriccenter.node.on_worker_heartbeat", self._node_id, str(worker_id), worker_status, options=PublishOptions(acknowledge=True), ) self.log.debug( 'Worker heartbeat sent [node_id="{node_id}", worker_id="{worker_id}", timestamp="{timestamp}", seq={seq}]', node_id=self._node_id, worker_id=worker_id, timestamp=np.datetime64(worker_status["timestamp"], "ns"), seq=worker_status["seq"], ) self.log.debug("{heartbeat}", heartbeat=pprint.pformat(worker_status)) else: self.log.warn( "Skipped sending management link node heartbeat for period {period} - no management uplink", period=self._heartbeat, ) except TransportLost: self.log.info( "Failed to send management link worker heartbeat for period {period} - transport lost", period=self._heartbeat, ) else: self.log.info( "Skipped sending management link heartbeat for period {period} (not connected)", period=self._heartbeat ) return succeed(None)
[docs] def _start_cfc_heartbeat(self): self.log.info( "Starting management heartbeat .. [period={period} seconds]", period=hlid(self._heartbeat_heartbeat_period) ) self._heartbeat_time_ns = None self._heartbeat = 0 @inlineCallbacks def publish(): self._heartbeat_time_ns = time_ns() self._heartbeat += 1 try: yield self._send_heartbeat() except Exception: self.log.failure() self._heartbeat_call = LoopingCall(publish) self._heartbeat_call.start(self._heartbeat_heartbeat_period)
[docs] def _stop_cfc_heartbeat(self): if self._heartbeat_call: self._heartbeat_call.stop() self._heartbeat_call = None self._heartbeat_time_ns = None self._heartbeat = 0
@inlineCallbacks
[docs] def _start_event_forwarding(self): # setup event forwarding (events originating locally are forwarded uplink) # @inlineCallbacks def on_management_event(*args, **kwargs): if not (self._manager and self._manager.is_attached()): self.log.warn("Can't foward management event: CFC session not attached") return details = kwargs.pop("details") # a node local event such as 'crossbar.node.on_ready' is mogrified to 'local.crossbar.node.on_ready' # (one reason is that URIs such as 'wamp.*' and 'crossbar.*' are restricted to trusted sessions, and # the management bridge is connecting over network to the uplink CFC and hence can't be trusted) # topic = self._translate_uri(details.topic) try: yield self._manager.publish(topic, *args, options=PublishOptions(acknowledge=True), **kwargs) except Exception: self.log.failure( "Failed to forward event on topic '{topic}': {log_failure.value}", topic=topic, ) else: if topic.endswith(".on_log"): log = self.log.debug else: log = self.log.debug log( "Forwarded management {forward_type} to CFC [local_uri={local_topic}, remote_uri={remote_topic}]", forward_type=hl("EVENT"), local_topic=hlid(details.topic), remote_topic=hlid(topic), ) try: sub = self._sub_on_mgmt = yield self.subscribe( on_management_event, "crossbar.", options=SubscribeOptions(match="prefix", details_arg="details") ) self.log.debug("Setup prefix subscription to forward node management events: {sub}", sub=sub) except: self.log.failure()
@inlineCallbacks
[docs] def _stop_event_forwarding(self): if self._sub_on_mgmt: yield self._sub_on_mgmt.unsubscribe() self._sub_on_mgmt = None
@inlineCallbacks
[docs] def _start_call_forwarding(self): # forward future new registrations # @inlineCallbacks def on_registration_create(session_id, registration): # we use the WAMP meta API implemented by CB to get notified whenever a procedure is # registered/unregister on the node management router, setup a forwarding procedure # and register that on the uplink CFC router if not (self._manager and self._manager.is_attached()): self.log.warn("Can't create forward management registration: CFC session not attached") return local_uri = registration["uri"] remote_uri = self._translate_uri(local_uri) self.log.debug( "Setup management API forwarding: {remote_uri} -> {local_uri}", remote_uri=remote_uri, local_uri=local_uri, ) def forward_call(*args, **kwargs): kwargs.pop("details", None) self.log.debug( "Forwarding management {forward_type} from CFC .. [remote_uri={remote_uri}, local_uri={local_uri}]", forward_type=hl("CALL"), local_uri=hlid(local_uri), remote_uri=hlid(remote_uri), ) return self.call(local_uri, *args, **kwargs) try: reg = yield self._manager.register(forward_call, remote_uri) except Exception: self.log.failure( "Failed to register management procedure '{remote_uri}': {log_failure.value}", remote_uri=remote_uri, ) else: self._regs[registration["id"]] = reg self.log.debug("Management procedure registered: '{remote_uri}'", remote_uri=reg.procedure) self._sub_on_reg_create = yield self.subscribe(on_registration_create, "wamp.registration.on_create") # stop forwarding future registrations # @inlineCallbacks def on_registration_delete(session_id, registration_id): if not (self._manager and self._manager.is_attached()): self.log.debug("Can't delete forward management registration: CFC session not attached") return reg = self._regs.pop(registration_id, None) if reg: yield reg.unregister() self.log.debug("Management procedure unregistered: '{remote_uri}'", remote_uri=reg.procedure) else: self.log.warn( "Could not remove forwarding for unmapped registration_id {reg_id}", reg_id=registration_id ) self._sub_on_reg_delete = yield self.subscribe(on_registration_delete, "wamp.registration.on_delete") # start forwarding current registrations # res = yield self.call("wamp.registration.list") for match_type, reg_ids in res.items(): for reg_id in reg_ids: registration = yield self.call("wamp.registration.get", reg_id) if registration["uri"].startswith("crossbar."): yield on_registration_create(None, registration) else: # eg skip WAMP meta API procs like "wamp.session.list" self.log.debug("skipped {uri}", uri=registration["uri"])
@inlineCallbacks
[docs] def _stop_call_forwarding(self): if self._sub_on_reg_create: yield self._sub_on_reg_create.unsubscribe() self._sub_on_reg_create = None if self._sub_on_reg_delete: yield self._sub_on_reg_delete.unsubscribe() self._sub_on_reg_delete = None