Source code for crossbar.worker.router

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

from pprint import pformat
from typing import Dict
from uuid import uuid4

from autobahn import wamp
from autobahn.util import utcstr
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.message import identify_realm_name_category
from autobahn.wamp.types import CallDetails, ComponentConfig, PublishOptions, SessionIdent
from twisted.internet.defer import Deferred, DeferredList, inlineCallbacks, maybeDeferred, returnValue, succeed
from twisted.python.failure import Failure

from crossbar._util import class_name, hlid, hltype, hlval
from crossbar.router.router import RouterFactory
from crossbar.router.service import RouterServiceAgent
from crossbar.router.session import RouterSessionFactory
from crossbar.worker import _appsession_loader
from crossbar.worker.controller import WorkerController
from crossbar.worker.rlink import RLinkConfig
from crossbar.worker.transport import RouterTransport, TransportController
from crossbar.worker.types import RouterComponent, RouterRealm, RouterRealmRole

__all__ = ("RouterController",)


[docs] class RouterController(TransportController): """ A native Crossbar.io worker that runs a WAMP router which can manage multiple realms, run multiple transports and links, as well as host multiple (embedded) application components. """
[docs] WORKER_TYPE = "router"
[docs] WORKER_TITLE = "Router"
[docs] router_realm_class = RouterRealm
[docs] router_factory_class = RouterFactory
def __init__(self, config=None, reactor=None, personality=None): super(RouterController, self).__init__( config=config, reactor=reactor, personality=personality, ) # factory for producing (per-realm) routers
[docs] self._router_factory = self.router_factory_class(self.config.extra.node, self.config.extra.worker, self)
# factory for producing router sessions
[docs] self._router_session_factory = RouterSessionFactory(self._router_factory)
# map: realm ID -> RouterRealm
[docs] self.realms: Dict[str, RouterRealm] = {}
# map: realm URI name -> realm ID
[docs] self.realm_to_id: Dict[str, str] = {}
[docs] self._service_sessions = {}
# map: component ID -> RouterComponent
[docs] self.components: Dict[str, RouterComponent] = {}
# "global" shared between all components
[docs] self.components_shared = {"reactor": reactor}
# map: transport ID -> RouterTransport
[docs] self.transports: Dict[str, RouterTransport] = {}
[docs] def realm_by_name(self, name): realm_id = self.realm_to_id.get(name, None) assert realm_id in self.realms return self.realms[realm_id]
@property
[docs] def router_factory(self): """ :return: The router factory used for producing (per-realm) routers. """ return self._router_factory
@property
[docs] def router_session_factory(self): """ :return: The router session factory for producing router sessions. """ return self._router_session_factory
[docs] def onWelcome(self, msg): # this is a hook for authentication methods to deny the # session after the Welcome message -- do we need to do # anything in this impl? pass
@inlineCallbacks
[docs] def onJoin(self, details, publish_ready=True): """ Called when worker process has joined the node's management realm. """ self.log.info( 'Router worker session for "{worker_id}" joined realm "{realm}" on node router {method}', realm=self._realm, worker_id=self._worker_id, session_id=details.session, method=hltype(RouterController.onJoin), ) yield WorkerController.onJoin(self, details, publish_ready=False) # WorkerController.publish_ready() self.publish_ready() self.log.info('Router worker session for "{worker_id}" ready', worker_id=self._worker_id)
[docs] def onLeave(self, details): # when this router is shutting down, we disconnect all our # components so that they have a chance to shutdown properly # -- e.g. on a ctrl-C of the router. leaves = [] if self.components: for component in self.components.values(): if component.session.is_connected(): d = maybeDeferred(component.session.leave) def done(_): self.log.info( "component '{component_id}' disconnected", component_id=component.id, ) component.session.disconnect() d.addCallback(done) leaves.append(d) dl = DeferredList(leaves, consumeErrors=True) # we want our default behavior, which disconnects this # router-worker, effectively shutting it down .. but only # *after* the components got a chance to shutdown. dl.addBoth(lambda _: super(RouterController, self).onLeave(details))
@wamp.register(None)
[docs] def get_router_realms(self, details=None): """ Get realms currently running on this router worker. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: List of realms currently running. :rtype: list[str] """ self.log.debug("{name}.get_router_realms", name=self.__class__.__name__) return sorted(self.realms.keys())
@wamp.register(None)
[docs] def get_router_realm(self, realm_id, details=None): """ Return realm detail information. :param realm_id: Realm ID within router worker. :type realm_id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: realm information object :rtype: dict """ self.log.debug("{name}.get_router_realm(realm_id={realm_id})", name=self.__class__.__name__, realm_id=realm_id) if realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) return self.realms[realm_id].marshal()
@wamp.register(None)
[docs] def get_router_realm_by_name(self, realm_name, details=None): """ Return realm detail information. :param realm_name: Realm name. :type realm_name: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: realm information object :rtype: dict """ self.log.debug( '{klass}.get_router_realm_by_name(realm_name="{realm_name}")', klass=self.__class__.__name__, realm_name=realm_name, ) if realm_name not in self.realm_to_id: raise ApplicationError("crossbar.error.no_such_object", 'No realm with name "{}"'.format(realm_name)) return self.realms[self.realm_to_id[realm_name]].marshal()
@wamp.register(None)
[docs] def get_router_realm_stats(self, realm_id=None, details=None): """ Return realm messaging statistics. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: realm statistics object :rtype: dict """ self.log.debug( "{name}.get_router_realm_stats(realm_id={realm_id})", name=self.__class__.__name__, realm_id=realm_id ) if realm_id is not None and realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) if realm_id: realm_ids = [realm_id] else: realm_ids = self.realms.keys() res = {} for realm_id in realm_ids: realm = self.realms[realm_id] if realm.router: res[realm_id] = realm.router.stats() return res
@wamp.register(None) @inlineCallbacks
[docs] def start_router_realm(self, realm_id, realm_config, details=None): """ Starts a realm on this router worker. The minimum configuration must contain the realm name: .. code-block:: python { "name": "realm1" } The configuration can also configure one or more roles, including configuration of role permissions: .. code-block:: python { "name": "realm1", "roles": [{ "name": "anonymous", "permissions": [{ "uri": "", "match": "prefix", "allow": { "call": True, "register": True, "publish": True, "subscribe": True }, "disclose": { "caller": True, "publisher": True }, "cache": True }] }] } :param realm_id: The ID of the realm to start. :type realm_id: str :param realm_config: The realm configuration. :type realm_config: dict :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.info( "Starting router realm {realm_id} {method}", realm_id=hlid(realm_id), method=hltype(RouterController.start_router_realm), ) # prohibit starting a realm twice # if realm_id in self.realms: emsg = "Could not start realm: a realm with ID '{}' is already running (or starting)".format(realm_id) self.log.error(emsg) raise ApplicationError("crossbar.error.already_running", emsg) # check configuration # try: self.personality.check_router_realm(self.personality, realm_config) except Exception as e: emsg = "Invalid router realm configuration: {}".format(e) self.log.error(emsg) raise ApplicationError("crossbar.error.invalid_configuration", emsg) # URI of the realm to start realm_name = realm_config["name"] # Category of the realm to start, as determined # from realm name: "standalone", "eth", "ens", "reverse_ens" or None realm_category = identify_realm_name_category(realm_name) if realm_category is None: emsg = 'Invalid router realm configuration: the name "{}" is not a valid WAMP realm name'.format( realm_name ) self.log.error(emsg) raise ApplicationError("crossbar.error.invalid_configuration", emsg) else: self.log.info( '{func} starting {realm_category}-realm with WAMP realm name "{realm_name}" ' "using router local realm_id {realm_id}", func=hltype(self.start_router_realm), realm_name=hlid(realm_name), realm_id=hlid(realm_id), realm_category=hlval(realm_category.upper(), color="magenta"), ) # router/realm wide options options = realm_config.get("options", {}) enable_meta_api = options.get("enable_meta_api", True) # expose router/realm service API additionally on local node management router bridge_meta_api = options.get("bridge_meta_api", False) if bridge_meta_api: # FIXME bridge_meta_api_prefix = "crossbar.worker.{worker_id}.realm.{realm_id}.root.".format( worker_id=self._worker_id, realm_id=realm_id ) else: bridge_meta_api_prefix = None # track realm rlm = self.router_realm_class(self, realm_id, realm_config, realm_category) self.realms[realm_id] = rlm self.realm_to_id[realm_name] = realm_id # create a new router for the realm rlm.router = self._router_factory.start_realm(rlm) if rlm.router._store and hasattr(rlm.router._store, "start"): yield rlm.router._store.start() # add a router/realm service session extra = { # the realm category, one of ["standalone", "eth", "ens", "reverse_ens"] "category": realm_category, # the RouterServiceAgent will fire this in onJoin() when it is ready "onready": Deferred(), # if True, forward the WAMP meta API (implemented by RouterServiceAgent) # that is normally only exposed on the app router/realm _additionally_ # to the local node management router. "enable_meta_api": enable_meta_api, "bridge_meta_api": bridge_meta_api, "bridge_meta_api_prefix": bridge_meta_api_prefix, # the management session on the local node management router to which # the WAMP meta API is exposed to additionally, when the bridge_meta_api option is set "management_session": self, } # WAMP session configuration for service agent (WAMP meta API) cfg = ComponentConfig(realm_name, extra) # wamp meta api only allowed for "trusted" sessions svc_authrole = "trusted" # each worker is run under its own dedicated WAMP auth role # svc_authrole = 'crossbar.worker.{}'.format(self._worker_id) svc_authid = "routerworker-{}-realm-{}-serviceagent".format(self._worker_id, realm_id) # create WAMP session for the service agent (WAMP meta API) and store on realm object rlm.session = RouterServiceAgent(cfg, rlm.router) # add the service agent session directly on the router (under the respective authid/authrole) self._router_session_factory.add(rlm.session, rlm.router, authid=svc_authid, authrole=svc_authrole) # set the service agent (WAMP meta API) session on the realm container self.set_service_session(rlm.session, realm_name, authrole=svc_authrole) # already fired at the end of RouterServiceAgent.onJoin # yield extra['onready'] self.log.info( 'RouterServiceAgent started on realm="{realm_name}" with authrole="{authrole}", authid="{authid}"', realm_name=realm_name, authrole=svc_authrole, authid=svc_authid, ) self.publish("{}.on_realm_started".format(self._uri_prefix), realm_id) topic = "{}.on_realm_started".format(self._uri_prefix) event = rlm.marshal() caller = details.caller if details else None self.publish(topic, event, options=PublishOptions(exclude=caller)) self.log.info( 'Realm "{realm_id}" (category="{realm_category}", name="{realm_name}", authrole="{authrole}", authid="{authid}") started', realm_id=realm_id, realm_category=realm_category, realm_name=rlm.session._realm, authrole=svc_authrole, authid=svc_authid, ) return event
@wamp.register(None) @inlineCallbacks
[docs] def stop_router_realm(self, realm_id, details=None): """ Stop a realm currently running on this router worker. When a realm has stopped, no new session will be allowed to attach to the realm. Optionally, close all sessions currently attached to the realm. :param id: ID of the realm to stop. :type id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.info("{name}.stop_router_realm", name=self.__class__.__name__) if realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) rlm = self.realms[realm_id] realm_name = rlm.config["name"] # stop the RouterServiceAgent living on the realm yield rlm.session.leave() self._router_session_factory.remove(rlm.session) self.log.info('RouterServiceAgent stopped on realm "{realm_name}"', realm_name=realm_name) detached_sessions = self._router_factory.stop_realm(realm_name) del self.realms[realm_id] del self.realm_to_id[realm_name] realm_stopped = {"id": realm_id, "name": realm_name, "detached_sessions": sorted(detached_sessions)} self.publish("{}.on_realm_stopped".format(self._uri_prefix), realm_id) returnValue(realm_stopped)
[docs] def has_realm(self, realm: str) -> bool: """ Check if a realm with the given name is currently running. :param realm: Realm name (_not_ ID). :type realm: str :returns: True if realm is running. :rtype: bool """ result = realm in self.realm_to_id and self.realm_to_id[realm] in self.realms self.log.debug( '{func}(realm="{realm}") -> {result}', func=hltype(RouterController.has_realm), realm=hlid(realm), result=hlval(result), ) return result
[docs] def has_role(self, realm: str, authrole: str) -> bool: """ Check if a role with the given name is currently running in the given realm. :param realm: WAMP realm (name, _not_ run-time ID). :type realm: str :param authrole: WAMP authentication role (URI, _not_ run-time ID). :type authrole: str :returns: True if realm is running. :rtype: bool """ authrole = authrole or "trusted" result = realm in self.realm_to_id and self.realm_to_id[realm] in self.realms if result: realm_id = self.realm_to_id[realm] result = ( authrole in self.realms[realm_id].role_to_id and self.realms[realm_id].role_to_id[authrole] in self.realms[realm_id].roles ) # note: this is to enable eg built-in "trusted" authrole result = result or authrole in self._service_sessions[realm] self.log.debug( '{func}(realm="{realm}", authrole="{authrole}") -> {result}', func=hltype(RouterController.has_role), realm=hlid(realm), authrole=hlid(authrole), result=hlval(result), ) return result
[docs] def set_service_session(self, session, realm, authrole): authrole = authrole or "trusted" if realm not in self._service_sessions: self._service_sessions[realm] = {} self._service_sessions[realm][authrole] = session self.log.info( '{func}(session={session}, realm="{realm}", authrole="{authrole}")', func=hltype(self.set_service_session), session=session, realm=hlid(realm), authrole=hlid(authrole), )
[docs] def get_service_session(self, realm, authrole): authrole = authrole or "trusted" session = None if realm in self._service_sessions: if authrole in self._service_sessions[realm]: session = self._service_sessions[realm][authrole] self.log.debug( '{func}(realm="{realm}", authrole="{authrole}") -> {session}', func=hltype(self.get_service_session), session=session, realm=hlid(realm), authrole=hlid(authrole), ) return succeed(session)
@wamp.register(None)
[docs] def get_router_realm_roles(self, realm_id, details=None): """ Get roles currently running on a realm running on this router worker. :param realm_id: The ID of the realm to list roles for. :type realm_id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: A list of roles. :rtype: list[dict] """ self.log.debug("{name}.get_router_realm_roles({realm_id})", name=self.__class__.__name__, realm_id=realm_id) if realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) return self.realms[realm_id].roles.values()
@wamp.register(None)
[docs] def get_router_realm_role(self, realm_id, role_id, details=None): """ Return role detail information. :param realm_id: The ID of the realm to get a role for. :type realm_id: str :param role_id: The ID of the role to get. :type role_id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: role information object :rtype: dict """ self.log.debug( "{name}.get_router_realm_role(realm_id={realm_id}, role_id={role_id})", name=self.__class__.__name__, realm_id=realm_id, role_id=role_id, ) if realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) if role_id not in self.realms[realm_id].roles: raise ApplicationError( "crossbar.error.no_such_object", "No role with ID '{}' on realm '{}'".format(role_id, realm_id) ) return self.realms[realm_id].roles[role_id].marshal()
@wamp.register(None)
[docs] def start_router_realm_role(self, realm_id, role_id, role_config, details=None): """ Start a role on a realm running on this router worker. :param id: The ID of the realm the role should be started on. :type id: str :param role_id: The ID of the role to start under. :type role_id: str :param config: The role configuration. :type config: dict :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.debug( 'Starting role "{role_id}" on realm "{realm_id}" {method}', role_id=role_id, realm_id=realm_id, method=hltype(self.start_router_realm_role), ) if realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) if role_id in self.realms[realm_id].roles: raise ApplicationError( "crossbar.error.already_exists", "A role with ID '{}' already exists in realm with ID '{}'".format(role_id, realm_id), ) realm = self.realms[realm_id].config["name"] role = RouterRealmRole(role_id, role_config) role_name = role.config["name"] if role_name in self.realms[realm_id].role_to_id: raise ApplicationError( "crossbar.error.already_exists", "A role with name '{}' already exists in realm with ID '{}'".format(role_name, realm_id), ) self.realms[realm_id].roles[role_id] = role self.realms[realm_id].role_to_id[role_name] = role_id self._router_factory.add_role(realm, role_config) topic = "{}.on_router_realm_role_started".format(self._uri_prefix) event = self.realms[realm_id].roles[role_id].marshal() caller = details.caller if details else None self.publish(topic, event, options=PublishOptions(exclude=caller)) self.log.debug( 'Role {role_id} named "{role_name}" started on realm "{realm}" with configuration=\n{role_config}', role_id=hlid(role_id), role_name=hlid(role_name), realm=hlid(realm), role_config=pformat(role_config), func=hltype(self.start_router_realm_role), ) return event
@wamp.register(None)
[docs] def stop_router_realm_role(self, realm_id, role_id, details=None): """ Stop a role currently running on a realm running on this router worker. :param realm_id: The ID of the realm of the role to be stopped. :type realm_id: str :param role_id: The ID of the role to be stopped. :type role_id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.debug("{name}.stop_router_realm_role", name=self.__class__.__name__) if realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) if role_id not in self.realms[realm_id].roles: raise ApplicationError( "crossbar.error.no_such_object", "No role with ID '{}' in realm with ID '{}'".format(role_id, realm_id) ) role = self.realms[realm_id].roles.pop(role_id) del self.realms[realm_id].role_to_id[role.config["name"]] topic = "{}.on_router_realm_role_stopped".format(self._uri_prefix) event = role.marshal() caller = details.caller if details else None self.publish(topic, event, options=PublishOptions(exclude=caller)) self.log.info("role {role_id} on realm {realm_id} stopped", realm_id=realm_id, role_id=role_id) return event
@wamp.register(None)
[docs] def get_router_components(self, details=None): """ Get app components currently running in this router worker. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: List of app components currently running. :rtype: list[dict] """ self.log.debug("{name}.get_router_components", name=self.__class__.__name__) res = [] for component in sorted(self.components.values(), key=lambda c: c.created): res.append( { "id": component.id, "created": utcstr(component.created), "config": component.config, } ) return res
@wamp.register(None)
[docs] def get_router_component(self, component_id, details=None): """ Get details about a router component :param component_id: The ID of the component to get :type component_id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: Details of component :rtype: dict """ self.log.debug( "{name}.get_router_component({component_id})", name=self.__class__.__name__, component_id=component_id ) if component_id in self.components: return self.components[component_id].marshal() else: raise ApplicationError("crossbar.error.no_such_object", "No component {}".format(component_id))
@wamp.register(None)
[docs] def start_router_component(self, component_id, config, details=None): """ Start an app component in this router worker. :param component_id: The ID of the component to start. :type component_id: str :param config: The component configuration. :type config: dict :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.debug("{name}.start_router_component", name=self.__class__.__name__) # prohibit starting a component twice # if component_id in self.components: emsg = "Could not start component: a component with ID '{}' is already running (or starting)".format( component_id ) self.log.error(emsg) raise ApplicationError("crossbar.error.already_running", emsg) started_d = Deferred() # check configuration # try: self.personality.check_router_component(self.personality, config) except Exception as e: emsg = "Invalid router component configuration: {}".format(e) self.log.error(emsg) raise ApplicationError("crossbar.error.invalid_configuration", emsg) else: self.log.debug("Starting {type}-component on router.", type=config["type"]) # resolve references to other entities # references = {} for ref in config.get("references", []): ref_type, ref_id = ref.split(":") if ref_type == "connection": if ref_id in self._connections: references[ref] = self._connections[ref_id] else: emsg = "cannot resolve reference '{}' - no '{}' with ID '{}'".format(ref, ref_type, ref_id) self.log.error(emsg) raise ApplicationError("crossbar.error.invalid_configuration", emsg) else: emsg = "cannot resolve reference '{}' - invalid reference type '{}'".format(ref, ref_type) self.log.error(emsg) raise ApplicationError("crossbar.error.invalid_configuration", emsg) # create component config # realm = config.get("realm", None) assert isinstance(realm, str) extra = config.get("extra", {}) assert isinstance(extra, dict) # forward crossbar node base directory extra["cbdir"] = self.config.extra.cbdir # allow access to controller session controller = self if self.config.extra.expose_controller else None # expose an object shared between components shared = self.components_shared if self.config.extra.expose_shared else None # this is the component configuration provided to the components ApplicationSession component_config = ComponentConfig( realm=realm, extra=extra, keyring=None, controller=controller, shared=shared ) # define component ctor function try: create_component = _appsession_loader(config) except ApplicationError as e: # for convenience, also log failed component loading self.log.error("component loading failed", log_failure=Failure()) if "No module named" in str(e): self.log.error(" Python module search paths:") for path in e.kwargs["pythonpath"]: self.log.error(" {path}", path=path) raise # check component extra configuration # if hasattr(create_component, "check_config") and callable(create_component.check_config) and extra: try: create_component.check_config(self.personality, extra) except Exception as e: emsg = "invalid router component extra configuration: {}".format(e) self.log.debug(emsg) raise ApplicationError("crossbar.error.invalid_configuration", emsg) else: self.log.debug('starting router component "{component_id}" ..', component_id=component_id) # create and add an WAMP application session to run the component next to the router try: session = create_component(component_config) # any exception spilling out from user code in onXXX handlers is fatal! def panic(fail, msg): self.log.error("Fatal error in component: {msg} - {log_failure.value}", msg=msg, log_failure=fail) session.disconnect() session._swallow_error = panic except Exception: self.log.error( "Component instantiation failed", log_failure=Failure(), ) raise # Note that 'join' is fired to listeners *before* onJoin runs, # so if you do 'yield self.leave()' in onJoin we'll still # publish "started" before "stopped". def publish_stopped(session, stop_details): self.log.info( "stopped component: {session} id={session_id}", session=class_name(session), session_id=session._session_id, ) topic = self._uri_prefix + ".on_component_stop" event = {"id": component_id} caller = details.caller if details else None self.publish(topic, event, options=PublishOptions(exclude=caller)) if not started_d.called: started_d.errback(Exception("Session left before being ready")) return event def publish_ready(session): """ when our component is ready, we publish .on_component_ready """ self.log.info( "component ready: {session} id={session_id}", session=class_name(session), session_id=session._session_id, ) topic = self._uri_prefix + ".on_component_ready" event = {"id": component_id} self.publish(topic, event) started_d.callback(event) return event def publish_started(session, start_details): """ when our component starts, we publish .on_component_start """ # hook up handlers for "session is ready" session.on("ready", publish_ready) # publish .on_component_start self.log.info( "started component: {session} id={session_id}", session=class_name(session), session_id=session._session_id, ) topic = self._uri_prefix + ".on_component_start" event = {"id": component_id} caller = details.caller if details else None self.publish(topic, event, options=PublishOptions(exclude=caller)) return event session.on("leave", publish_stopped) session.on("join", publish_started) self.components[component_id] = RouterComponent(component_id, config, session) router = self._router_factory.get(realm) self._router_session_factory.add( session, router, authrole=config.get("role", "anonymous"), authid=uuid4().__str__() ) self.log.debug( "Added component {component_id} (type '{name}')", component_id=component_id, name=class_name(session), ) return started_d
@wamp.register(None)
[docs] def stop_router_component(self, component_id, details=None): """ Stop an app component currently running in this router worker. :param component_id: The ID of the component to stop. :type component_id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.debug( "{name}.stop_router_component({component_id})", name=self.__class__.__name__, component_id=component_id ) if component_id in self.components: self.log.debug( "Worker {worker}: stopping component {component_id}", worker=self.config.extra.worker, component_id=component_id, ) try: # self._components[id].disconnect() self._session_factory.remove(self.components[component_id]) del self.components[component_id] except Exception as e: raise ApplicationError( "crossbar.error.cannot_stop", "Failed to stop component {}: {}".format(component_id, e) ) else: raise ApplicationError("crossbar.error.no_such_object", "No component {}".format(component_id))
@wamp.register(None)
[docs] def get_router_transports(self, details=None): """ Get transports currently running in this router worker. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: List of transports currently running. :rtype: list[dict] """ self.log.debug("{name}.get_router_transports", name=self.__class__.__name__) res = [] for transport in sorted(self.transports.values(), key=lambda c: c.created): res.append(transport.marshal()) return res
@wamp.register(None)
[docs] def get_router_transport(self, transport_id, details=None): """ Get transports currently running in this router worker. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: List of transports currently running. :rtype: list[dict] """ self.log.debug("{name}.get_router_transport", name=self.__class__.__name__) if transport_id in self.transports: transport = self.transports[transport_id] obj = transport.marshal() return obj else: raise ApplicationError("crossbar.error.no_such_object", "No transport {}".format(transport_id))
@wamp.register(None)
[docs] def start_router_transport(self, transport_id, config, create_paths=False, details=None): """ Start a transport on this router worker. :param transport_id: The ID of the transport to start. :type transport_id: str :param config: The transport configuration. :type config: dict :param create_paths: If set, start subservices defined in the configuration too. This currently only applies to Web services, which are part of a Web transport. :type create_paths: bool :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.info( 'Starting router transport "{transport_id}" {method}', transport_id=transport_id, method=hltype(self.start_router_transport), ) # prohibit starting a transport twice if transport_id in self.transports: _emsg = 'Could not start transport: a transport with ID "{}" is already running (or starting)'.format( transport_id ) self.log.error(_emsg) raise ApplicationError("crossbar.error.already_running", _emsg) # create a transport and parse the transport configuration router_transport = self.personality.create_router_transport(self, transport_id, config) caller = details.caller if details else None event = {"id": transport_id} topic = "{}.on_router_transport_starting".format(self._uri_prefix) self.publish(topic, event, options=PublishOptions(exclude=caller)) # start listening .. d = router_transport.start(create_paths) def ok(_): self.transports[transport_id] = router_transport if config["endpoint"]["type"] == "tcp": endpoint = "TCP port {}".format(config["endpoint"]["port"]) if "portrange" in config["endpoint"]: transport_type = "TCP/{} transport".format(config["endpoint"]["portrange"]) else: transport_type = "TCP/{} transport".format(config["endpoint"]["port"]) elif config["endpoint"]["type"] == "unix": endpoint = 'UDS path "{}"'.format(config["endpoint"]["path"]) transport_type = "Unix domain socket transport" else: endpoint = "unknown" transport_type = "unknown" self.log.info( 'Router {transport_type} started as transport "{transport_id}" and listening on {endpoint}', transport_type=hlval(transport_type), transport_id=hlid(transport_id), endpoint=hlval(endpoint), ) topic = "{}.on_router_transport_started".format(self._uri_prefix) self.publish(topic, event, options=PublishOptions(exclude=caller)) return router_transport.marshal() def fail(err): _emsg = "Cannot listen on transport endpoint: {log_failure}" self.log.error(_emsg, log_failure=err) topic = "{}.on_router_transport_stopped".format(self._uri_prefix) self.publish(topic, event, options=PublishOptions(exclude=caller)) raise ApplicationError("crossbar.error.cannot_listen", _emsg) d.addCallbacks(ok, fail) return d
@wamp.register(None)
[docs] def stop_router_transport(self, transport_id, details=None): """ Stop a transport currently running in this router worker. :param transport_id: The ID of the transport to stop. :type transport_id: str :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` """ self.log.debug("{name}.stop_router_transport", name=self.__class__.__name__) if ( transport_id not in self.transports or self.transports[transport_id].state != self.personality.RouterTransport.STATE_STARTED ): emsg = "Cannot stop transport: no transport with ID '{}' or transport is already stopping".format( transport_id ) self.log.error(emsg) raise ApplicationError("crossbar.error.not_running", emsg) router_transport = self.transports[transport_id] self.log.debug("Stopping transport with ID '{transport_id}'", transport_id=transport_id) caller = details.caller if details else None event = router_transport.marshal() topic = "{}.on_router_transport_stopping".format(self._uri_prefix) self.publish(topic, event, options=PublishOptions(exclude=caller)) # stop listening .. d = router_transport.stop() def ok(_): del self.transports[transport_id] topic = "{}.on_router_transport_stopped".format(self._uri_prefix) self.publish(topic, event, options=PublishOptions(exclude=caller)) return event def fail(err): emsg = "Cannot stop listening on transport endpoint: {log_failure}" self.log.error(emsg, log_failure=err) raise ApplicationError("crossbar.error.cannot_stop", emsg) d.addCallbacks(ok, fail) return d
@wamp.register(None)
[docs] def kill_by_authid(self, realm_id, authid, reason, message=None, details=None): self.log.info( 'Killing sessions by authid="{authid}" ..', realm_id=hlid(realm_id), authid=hlid(authid), method=hltype(RouterController.start_router_realm), ) if realm_id not in self.realms: raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id)) # forward call directly to service agent return self.realms[realm_id].session.session_kill_by_authid(authid, reason, message=message, details=details)
@wamp.register(None) @wamp.register(None) @wamp.register(None) @inlineCallbacks @wamp.register(None) @inlineCallbacks