Source code for crossbar.master.api.wamp

###############################################################################
#
# Crossbar.io Master
# Copyright (c) typedef int GmbH. Licensed under EUPLv1.2.
#
###############################################################################

import re

from autobahn.wamp.types import RegisterOptions, SubscribeOptions
from txaio import make_logger

__all__ = ("RemoteWampApi",)


[docs] class RemoteWampApi(object): """ Exposes the WAMP meta API on a router realm from a remote node on CFC. This is only available if the router realm is started with option.bridge_meta_api == True. Events are published locally on the node WAMP meta API like [1] wamp.session.on_join (session_id, session_details) and (if bridged), on the local node management API [2] crossbar.worker.<worker_id>.realm.<realm_id>.root.wamp.session.on_join (session_id, session_details) which then get forward to CFC as [3] crossbarfabriccenter.node.<node_id>.worker.<worker_id>.realm.<realm_id>.root. (session_id, session_details) which is then republished by CFC as [4] crossbarfabriccenter.remote.realm.meta.wamp.session.on_join (node_id, worker_id, realm_id, session_id, session_details) For example: [1] wamp.session.on_join [2] crossbar.worker.worker-001.realm.realm-001.root.wamp.session.on_join [3] crossbarfabriccenter.node.cf1.worker.worker-001.realm.realm-001.root.wamp.session.on_join [4] crossbarfabriccenter.remote.realm.meta.wamp.session.on_join """
[docs] log = make_logger()
[docs] PREFIX = "crossbarfabriccenter.remote.realm.meta."
[docs] def register(self, session): try: def forward(node_id, worker_id, realm_id, *args, **kwargs): details = kwargs.pop("details", None) proc = details.procedure[len(self.PREFIX) :] uri = "crossbarfabriccenter.node.{node_id}.worker.{worker_id}.realm.{realm_id}.root.{proc}".format( node_id=node_id, worker_id=worker_id, realm_id=realm_id, proc=proc ) self.log.debug( 'Forwarding CFC remote WAMP meta API call <{proc}> on management realm "{mrealm}" to node_id "{node_id}", worker_id "{worker_id}", realm_id "{realm_id}"', node_id=node_id, worker_id=worker_id, realm_id=realm_id, proc=details.procedure, mrealm=session._realm, ) return session.call(uri, *args, **kwargs) reg = session.register( forward, self.PREFIX, options=RegisterOptions(match="prefix", details_arg="details") ) return [reg] except: self.log.failure() return []
[docs] def subscribe(self, session): remote_uri = "crossbarfabriccenter.node..worker..realm..root." remote_uri_regex = r"^crossbarfabriccenter.node.([a-z0-9][a-z0-9_\-]*).worker.([a-z0-9][a-z0-9_\-]*).realm.([a-z0-9][a-z0-9_\-]*).root.(\S*)$" remote_uri_pat = re.compile(remote_uri_regex) def forward(*args, **kwargs): try: details = kwargs.pop("details", None) if details: match = remote_uri_pat.match(details.topic) if match and len(match.groups()) == 4: node_id, worker_id, realm_id, suffix_uri = match.groups() # reverse our hack: see crossbar.router.service.RouterServiceSession.publish suffix_uri = suffix_uri.replace("-", ".") local_uri = "{}{}".format(self.PREFIX, suffix_uri) self.log.debug( 'RemoteWampApi.forward("{topic}") -> "{local_uri}"', topic=details.topic, local_uri=local_uri, ) return session.publish(local_uri, node_id, worker_id, realm_id, *args, **kwargs) # should not arrive here session.log.warn( "received unexpected WAMP meta event to forward for management API: details={details}", details=details, ) except: session.log.failure() sub = session.subscribe(forward, remote_uri, SubscribeOptions(match="wildcard", details=True)) return [sub]