Source code for crossbar.worker.rlink

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import copy
import pprint
from collections.abc import Mapping, Sequence
from typing import Dict

from autobahn import util
from autobahn.twisted.wamp import ApplicationRunner, ApplicationSession
from autobahn.util import hl, hlid, hltype, hluserid, hlval
from autobahn.wamp.exception import ApplicationError, TransportLost
from autobahn.wamp.message import Event, Invocation
from autobahn.wamp.types import (
    CallOptions,
    ComponentConfig,
    PublishOptions,
    RegisterOptions,
    SessionIdent,
    SubscribeOptions,
)
from twisted.internet.defer import Deferred, inlineCallbacks
from txaio import make_logger, time_ns

from crossbar.common.checkconfig import check_connecting_transport, check_dict_args, check_realm_name
from crossbar.common.twisted.endpoint import create_connecting_endpoint_from_config

__all__ = (
    "RLink",
    "RLinkConfig",
    "RLinkManager",
)


class BridgeSession(ApplicationSession):
    log = make_logger()

    def __init__(self, config):
        ApplicationSession.__init__(self, config)

        self._subs = {}
        # registration-id's of remote registrations from an rlink
        self._regs = {}

        self._exclude_authid = None
        self._exclude_authrole = None

    def onMessage(self, msg):
        if msg._router_internal is not None:
            if isinstance(msg, Event):
                msg.publisher, msg.publisher_authid, msg.publisher_authrole = msg._router_internal
            elif isinstance(msg, Invocation):
                msg.caller, msg.caller_authid, msg.caller_authrole = msg._router_internal
        return super(BridgeSession, self).onMessage(msg)

    @inlineCallbacks
    def _setup_event_forwarding(self, other):
        self.log.debug(
            "setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole})",
            exclude_authid=self._exclude_authid,
            exclude_authrole=self._exclude_authrole,
            me=self._session_id,
            other=other,
        )

        @inlineCallbacks
        def on_subscription_create(sub_session, sub_details, details=None):
            """
            Event handler fired when a new subscription was created on this router.

            The handler will then also subscribe on the other router, and when receiving
            events, re-publish those on this router.

            :param sub_session:
            :param sub_details:
            :param details:
            :return:
            """
            if sub_details["uri"].startswith("wamp."):
                return

            sub_id = sub_details["id"]

            if sub_id in self._subs and self._subs[sub_id]["sub"]:
                # This will happen if, partway through the subscription process, the RLink disconnects
                self.log.error(
                    "on_subscription_create: sub ID {sub_id} already in map {method}",
                    sub_id=sub_id,
                    method=hltype(BridgeSession._setup_event_forwarding),
                )
                return

            sub_details_local = copy.deepcopy(sub_details)
            if sub_id not in self._subs:
                sub_details_local["sub"] = None
                self._subs[sub_id] = sub_details_local

            uri = sub_details["uri"]
            ERR_MSG = [None]

            @inlineCallbacks
            def on_event(*args, **kwargs):
                assert "details" in kwargs
                details = kwargs.pop("details")
                options = kwargs.pop("options", None)

                self.log.debug(
                    "Received event on uri={uri}, options={options} (publisher={publisher}, publisher_authid={publisher_authid}, publisher_authrole={publisher_authrole}, forward_for={forward_for})",
                    uri=uri,
                    options=options,
                    publisher=details.publisher,
                    publisher_authid=details.publisher_authid,
                    publisher_authrole=details.publisher_authrole,
                    forward_for=details.forward_for,
                )

                assert details.publisher is not None
                this_forward = {
                    "session": details.publisher,
                    "authid": details.publisher_authid,
                    "authrole": details.publisher_authrole,
                }

                if details.forward_for:
                    # the event comes already forwarded from a router node
                    if len(details.forward_for) >= 0:
                        self.log.debug("SKIP! already forwarded")
                        return

                    forward_for = copy.deepcopy(details.forward_for)
                    forward_for.append(this_forward)
                else:
                    forward_for = [this_forward]

                options = PublishOptions(
                    acknowledge=True,
                    exclude_me=True,
                    exclude_authid=self._exclude_authid,
                    exclude_authrole=self._exclude_authrole,
                    forward_for=forward_for,
                )

                try:
                    yield self.publish(uri, *args, options=options, **kwargs)
                except TransportLost:
                    return
                except ApplicationError as e:
                    if e.error not in ["wamp.close.normal"]:
                        self.log.warn("FAILED TO PUBLISH 1: {} {}".format(type(e), str(e)))
                    return
                except Exception as e:
                    if not ERR_MSG[0]:
                        self.log.warn("FAILED TO PUBLISH 2: {} {}".format(type(e), str(e)))
                        ERR_MSG[0] = True
                    return

                self.log.debug(
                    "RLink forward-published event {dir} (options={options})",
                    dir=self.DIR,
                    options=options,
                )

            try:
                sub = yield other.subscribe(on_event, uri, options=SubscribeOptions(details=True))
            except TransportLost:
                self.log.debug(
                    "on_subscription_create: could not forward-subscription '{}' as RLink is not connected".format(uri)
                )
                return

            if sub_id not in self._subs:
                self.log.info("subscription already gone: {uri}", uri=sub_details["uri"])
                yield sub.unsubscribe()
            else:
                self._subs[sub_id]["sub"] = sub

            self.log.debug(
                "created forwarding subscription: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session}",
                me=self._session_id,
                other=other,
                sub_id=sub_id,
                sub_details=sub_details,
                details=details,
                sub_session=sub_session,
            )

        # listen to when a subscription is removed from the router
        #
        @inlineCallbacks
        def on_subscription_delete(session_id, sub_id, details=None):
            self.log.debug(
                "Subscription deleted: {me} {session} {sub_id} {details}",
                me=self,
                session=session_id,
                sub_id=sub_id,
                details=details,
            )

            sub_details = self._subs.get(sub_id, None)
            if not sub_details:
                self.log.debug("subscription not tracked - huh??")
                return

            uri = sub_details["uri"]

            sub = self._subs[sub_id]["sub"]
            if sub is None:
                # see above; we might have un-subscribed here before
                # we got an answer from the other router
                self.log.info("subscription has no 'sub'")
            else:
                yield sub.unsubscribe()

            del self._subs[sub_id]

            self.log.debug("{other} unsubscribed from {uri}".format(other=other, uri=uri))

        @inlineCallbacks
        def forward_current_subs():
            # get current subscriptions on the router
            subs = yield self.call("wamp.subscription.list")
            for sub_id in subs["exact"]:
                sub = yield self.call("wamp.subscription.get", sub_id)
                assert sub["id"] == sub_id, "Logic error, subscription IDs don't match"
                yield on_subscription_create(self._session_id, sub)

        @inlineCallbacks
        def on_remote_join(_session, _details):
            yield forward_current_subs()

        def on_remote_leave(_session, _details):
            # The remote session has ended, clear subscription records.
            # Clearing this dictionary helps avoid the case where
            # local procedures are not subscribed on the remote leg
            # on reestablishment of remote session.
            # See: https://github.com/crossbario/crossbar/issues/1909
            self._subs = {}

        if self.IS_REMOTE_LEG:
            yield forward_current_subs()
        else:
            # from the local leg, don't try to forward events on the
            # remote leg unless the remote session is established.
            other.on("join", on_remote_join)
            other.on("leave", on_remote_leave)

        # listen to when new subscriptions are created on the local router
        yield self.subscribe(
            on_subscription_create, "wamp.subscription.on_create", options=SubscribeOptions(details_arg="details")
        )

        yield self.subscribe(
            on_subscription_delete, "wamp.subscription.on_delete", options=SubscribeOptions(details_arg="details")
        )

        self.log.debug("{me}: event forwarding setup done", me=self)

    @inlineCallbacks
    def _setup_invocation_forwarding(self, other: ApplicationSession):
        self.log.info(
            "setup invocation forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole})",
            exclude_authid=self._exclude_authid,
            exclude_authrole=self._exclude_authrole,
            me=self,
            other=other,
        )

        # called when a registration is created on the local router
        @inlineCallbacks
        def on_registration_create(reg_session, reg_details, details=None):
            """
            Event handler fired when a new registration was created on this router.

            The handler will then also register on the other router, and when receiving
            calls, re-issue those on this router.

            :param reg_session:
            :param reg_details:
            :param details:
            :return:
            """
            if reg_details["uri"].startswith("wamp."):
                return

            reg_id = reg_details["id"]

            if reg_id in self._regs and self._regs[reg_id]["reg"]:
                # This will happen if, partway through the registration process, the RLink disconnects
                self.log.error(
                    "on_registration_create: reg ID {reg_id} already in map {method}",
                    reg_id=reg_id,
                    method=hltype(BridgeSession._setup_invocation_forwarding),
                )
                return

            reg_details_local = copy.deepcopy(reg_details)
            if reg_id not in self._regs:
                reg_details_local["reg"] = None
                self._regs[reg_id] = reg_details_local

            uri = reg_details["uri"]
            ERR_MSG = [None]

            @inlineCallbacks
            def on_call(*args, **kwargs):
                assert "details" in kwargs

                details = kwargs.pop("details")
                options = kwargs.pop("options", None)

                if details.caller is None or details.caller_authrole is None or details.caller_authid is None:
                    raise RuntimeError("Internal error attempting rlink forwarding")

                self.log.info(
                    "Received invocation on uri={uri}, options={options} (caller={caller}, caller_authid={caller_authid}, caller_authrole={caller_authrole}, forward_for={forward_for})",
                    uri=uri,
                    options=options,
                    caller=details.caller,
                    caller_authid=details.caller_authid,
                    caller_authrole=details.caller_authrole,
                    forward_for=details.forward_for,
                )

                this_forward = {
                    "session": details.caller,
                    "authid": details.caller_authrole,
                    "authrole": details.caller_authrole,
                }

                if details.forward_for:
                    # the call comes already forwarded from a router node ..
                    if len(details.forward_for) >= 0:
                        self.log.debug("SKIP! already forwarded")
                        return

                    forward_for = copy.deepcopy(details.forward_for)
                    forward_for.append(this_forward)
                else:
                    forward_for = [this_forward]

                options = CallOptions(forward_for=forward_for)

                try:
                    result = yield self.call(uri, *args, options=options, **kwargs)
                except TransportLost:
                    return
                except ApplicationError as e:
                    if e.error not in ["wamp.close.normal"]:
                        self.log.warn("FAILED TO CALL 1: {} {}".format(type(e), str(e)))
                    return
                except Exception as e:
                    if not ERR_MSG[0]:
                        self.log.warn("FAILED TO CALL 2: {} {}".format(type(e), str(e)))
                        ERR_MSG[0] = True
                    return

                self.log.info(
                    "RLink forward-invoked call {dir} (options={options})",
                    dir=self.DIR,
                    options=options,
                )
                return result

            try:
                reg = yield other.register(
                    on_call,
                    uri,
                    options=RegisterOptions(
                        details_arg="details",
                        invoke=reg_details.get("invoke", None),
                    ),
                )
            except TransportLost:
                self.log.debug(
                    "on_registration_create: could not forward-register '{}' as RLink is not connected".format(uri)
                )
                return
            except Exception as e:
                # FIXME: partially fixes https://github.com/crossbario/crossbar/issues/1894,
                #  however we need to make sure this situation never happens.
                if isinstance(e, ApplicationError) and e.error == "wamp.error.procedure_already_exists":
                    other_leg = "local" if self.IS_REMOTE_LEG else "remote"
                    self.log.debug(
                        f"on_registration_create: tried to register procedure {uri} on {other_leg} "
                        f"session but it's already registered."
                    )
                    return
                raise Exception("fatal: could not forward-register '{}'".format(uri))

            # so ... if, during that "yield" above while we register
            # on the "other" router, *this* router may have already
            # un-registered. If that happened, our registration will
            # be gone, so we immediately un-register on the other side
            if reg_id not in self._regs:
                self.log.info("registration already gone: {uri}", uri=reg_details["uri"])
                yield reg.unregister()
            else:
                self._regs[reg_id]["reg"] = reg

            self.log.debug(
                "created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}",
                me=self._session_id,
                other=other._session_id,
                reg_id=reg_id,
                reg_details=reg_details,
                details=details,
                reg_session=reg_session,
            )

        # called when a registration is removed from the local router
        @inlineCallbacks
        def on_registration_delete(session_id, reg_id, details=None):
            self.log.debug(
                "Registration deleted: {me} {session} {reg_id} {details}",
                me=self,
                session=session_id,
                reg_id=reg_id,
                details=details,
            )

            reg_details = self._regs.get(reg_id, None)
            if not reg_details:
                self.log.debug("registration not tracked - huh??")
                return

            uri = reg_details["uri"]

            reg = self._regs[reg_id]["reg"]
            if reg is None:
                # see above; we might have un-registered here before
                # we got an answer from the other router
                self.log.debug("registration has no 'reg'")
            else:
                yield reg.unregister()

            del self._regs[reg_id]

            self.log.debug("{other} unregistered from {uri}".format(other=other, uri=uri))

        @inlineCallbacks
        def register_current():
            # get current registrations on the router
            regs = yield self.call("wamp.registration.list")
            for reg_id in regs["exact"]:
                reg = yield self.call("wamp.registration.get", reg_id)
                assert reg["id"] == reg_id, "Logic error, registration IDs don't match"
                yield on_registration_create(self._session_id, reg)

        @inlineCallbacks
        def on_remote_join(_session, _details):
            yield register_current()

        def on_remote_leave(_session, _details):
            # The remote session has ended, clear registration records.
            # Clearing this dictionary helps avoid the case where
            # local procedures are not registered on the remote leg
            # on reestablishment of remote session.
            # See: https://github.com/crossbario/crossbar/issues/1909
            self._regs = {}

        if self.IS_REMOTE_LEG:
            yield register_current()
        else:
            # from the local leg, don't try to register procedures on the
            # remote leg unless the remote session is established.
            # This avoids issues where in-router components register procedures
            # on startup and when the rlink is setup, the local leg tries to
            # register procedures on the remote leg, even though the connection
            # hasn't established.
            # See: https://github.com/crossbario/crossbar/issues/1895
            other.on("join", on_remote_join)
            other.on("leave", on_remote_leave)

        # listen to when new registrations are created on the local router
        yield self.subscribe(
            on_registration_create, "wamp.registration.on_create", options=SubscribeOptions(details_arg="details")
        )

        # listen to when a registration is removed from the local router
        yield self.subscribe(
            on_registration_delete, "wamp.registration.on_delete", options=SubscribeOptions(details_arg="details")
        )

        self.log.info("{me}: call forwarding setup done", me=self._session_id)


class RLinkLocalSession(BridgeSession):
    """
    This session is the local leg of the router-to-router link and runs embedded inside the local router.
    """

    log = make_logger()

    IS_REMOTE_LEG = False

    # direction in which events are flowing (published) via this session
    DIR = hl("from remote to local", color="yellow", bold=True)

    def onConnect(self):
        self.log.info("{klass}.onConnect()", klass=self.__class__.__name__)
        # _BridgeSession.onConnect(self)
        authextra = {"rlink": self.config.extra["rlink"]}
        self.join(self.config.realm, authid=self.config.extra["rlink"], authextra=authextra)
        self._tracker = self.config.extra["tracker"]

    @inlineCallbacks
    def onJoin(self, details):
        assert self.config.extra and "on_ready" in self.config.extra
        assert self.config.extra and "other" in self.config.extra

        remote = self.config.extra["other"]
        assert isinstance(remote, RLinkRemoteSession)

        self._exclude_authid = self.config.extra.get("exclude_authid", None)
        self._exclude_authrole = self.config.extra.get("exclude_authrole", None)

        # setup local->remote event forwarding
        forward_events = self.config.extra.get("forward_events", False)
        if forward_events:
            yield self._setup_event_forwarding(remote)

        # setup local->remote invocation forwarding
        forward_invocations = self.config.extra.get("forward_invocations", False)
        if forward_invocations:
            yield self._setup_invocation_forwarding(remote)

        self.log.debug(
            "Router link local session ready (forward_events={forward_events}, forward_invocations={forward_invocations}, realm={realm}, authid={authid}, authrole={authrole}, session={session}) {method}",
            method=hltype(RLinkLocalSession.onJoin),
            forward_events=hluserid(forward_events),
            forward_invocations=hluserid(forward_invocations),
            realm=hluserid(details.realm),
            authid=hluserid(details.authid),
            authrole=hluserid(details.authrole),
            session=hlid(details.session),
        )

        on_ready = self.config.extra.get("on_ready", None)
        if on_ready and not on_ready.called:
            self.config.extra["on_ready"].callback(self)

    def onLeave(self, details):
        self.log.warn(
            "Router link local session down! (realm={realm}, authid={authid}, authrole={authrole}, session={session}, details={details}) {method}",
            method=hltype(RLinkLocalSession.onLeave),
            realm=hluserid(self.config.realm),
            authid=hluserid(self._authid),
            authrole=hluserid(self._authrole),
            details=details,
            session=hlid(self._session_id),
        )

        BridgeSession.onLeave(self, details)


class RLinkRemoteSession(BridgeSession):
    """
    This session is the remote leg of the router-to-router link.
    """

    log = make_logger()

    IS_REMOTE_LEG = True

    # directory in which events are flowing (published via this session
    DIR = hl("from local to remote", color="yellow", bold=True)

    def __init__(self, config):
        BridgeSession.__init__(self, config)

        # import here to resolve import dependency issues
        from crossbar.worker.router import RouterController

        self._subs = {}
        self._rlink_manager: RLinkManager = self.config.extra["rlink_manager"]
        self._router_controller: RouterController = self._rlink_manager.controller

    # FIXME: async? see below
    def onConnect(self):
        self.log.info("{func}() ...", func=hltype(self.onConnect))

        authid = self.config.extra.get("authid", None)
        authrole = self.config.extra.get("authrole", None)
        authextra = self.config.extra.get("authextra", {})

        # FIXME: use cryptosign-proxy
        authmethods = ["cryptosign"]

        # use WorkerController.get_public_key to call node controller
        # FIXME: the following does _not_ work with onConnect (?!)
        # _public_key = await self._router_controller.get_public_key()

        def actually_join(_public_key):
            authextra.update(
                {
                    # forward the client pubkey: this allows us to omit authid as
                    # the router can identify us with the pubkey already
                    "pubkey": _public_key,
                    # not yet implemented. a public key the router should provide
                    # a trustchain for its public key. the trustroot can eg be
                    # hard-coded in the client, or come from a command line option.
                    "trustroot": None,
                    # not yet implemented. for authenticating the router, this
                    # challenge will need to be signed by the router and send back
                    # in AUTHENTICATE for client to verify. A string with a hex
                    # encoded 32 bytes random value.
                    "challenge": None,
                    # https://tools.ietf.org/html/rfc5929
                    "channel_binding": "tls-unique",
                }
            )

            self.log.info(
                '{func} joining with realm="{realm}", authmethods={authmethods}, authid="{authid}", authrole="{authrole}", authextra={authextra}',
                func=hltype(self.onConnect),
                realm=hlval(self.config.realm),
                authmethods=hlval(authmethods),
                authid=hlval(authid),
                authrole=hlval(authrole),
                authextra=authextra,
            )

            self.join(
                self.config.realm, authmethods=authmethods, authid=authid, authrole=authrole, authextra=authextra
            )

        res = self._rlink_manager._controller.get_public_key()
        res.addCallback(actually_join)
        self.log.info("{func}() done (res={res}).", func=hltype(self.onConnect), res=res)
        return res

    # FIXME: async? see below
    def onChallenge(self, challenge):
        self.log.debug("{func}(challenge={challenge})", func=hltype(self.onChallenge), challenge=challenge)

        if challenge.method == "cryptosign":
            # alright, we've got a challenge from the router.

            # sign the challenge with our private key.
            channel_id_type = "tls-unique"
            channel_id_map = self._router_controller._transport.transport_details.channel_id
            if channel_id_type in channel_id_map:
                channel_id = channel_id_map[channel_id_type]
            else:
                channel_id = None
                channel_id_type = None

            # use WorkerController.get_public_key to call node controller
            # FIXME: await?
            signed_challenge = self._router_controller.sign_challenge(challenge, channel_id, channel_id_type)

            # send back the signed challenge for verification
            return signed_challenge

        else:
            raise Exception(
                "internal error: we asked to authenticate using wamp-cryptosign, but now received a challenge for {}".format(
                    challenge.method
                )
            )

    @inlineCallbacks
    def onJoin(self, details):
        self.log.debug("{klass}.onJoin(details={details})", klass=self.__class__.__name__, details=details)

        assert self.config.extra and "on_ready" in self.config.extra
        assert self.config.extra and "other" in self.config.extra

        local = self.config.extra["other"]
        assert isinstance(local, RLinkLocalSession)
        local._tracker.connected = True

        self._exclude_authid = self.config.extra.get("exclude_authid", None)
        self._exclude_authrole = self.config.extra.get("exclude_authrole", None)

        # setup remote->local event forwarding
        forward_events = self.config.extra.get("forward_events", False)
        if forward_events:
            yield self._setup_event_forwarding(local)

        # setup remote->local invocation forwarding
        forward_invocations = self.config.extra.get("forward_invocations", False)
        if forward_invocations:
            yield self._setup_invocation_forwarding(local)

        self.log.info(
            "{klass}.onJoin(): rlink remote session ready (forward_events={forward_events}, forward_invocations={forward_invocations}, realm={realm}, authid={authid}, authrole={authrole}, session={session}) {method}",
            klass=self.__class__.__name__,
            method=hltype(RLinkRemoteSession.onJoin),
            forward_events=hluserid(forward_events),
            forward_invocations=hluserid(forward_invocations),
            realm=hluserid(details.realm),
            authid=hluserid(details.authid),
            authrole=hluserid(details.authrole),
            session=hlid(details.session),
        )

        # we are ready!
        on_ready = self.config.extra.get("on_ready", None)
        if on_ready and not on_ready.called:
            self.config.extra["on_ready"].callback(self)

    @inlineCallbacks
    def onLeave(self, details):
        # When the rlink is going down, make sure to unsubscribe to
        # all events that are subscribed on the local-leg.
        # This avoids duplicate events that would otherwise arrive
        # See: https://github.com/crossbario/crossbar/issues/1916
        for k, v in self._subs.items():
            if v["sub"].active:
                yield v["sub"].unsubscribe()
        self._subs = {}

        for (
            k,
            v,
        ) in self._regs.items():
            if v["reg"] and v["reg"].active:
                yield v["reg"].unregister()
        self._regs = {}

        self.config.extra["other"]._tracker.connected = False
        self.log.warn(
            "{klass}.onLeave(): rlink remote session left! (realm={realm}, authid={authid}, authrole={authrole}, session={session}, details={details}) {method}",
            klass=self.__class__.__name__,
            method=hltype(RLinkLocalSession.onLeave),
            realm=hluserid(self.config.realm),
            authid=hluserid(self._authid),
            authrole=hluserid(self._authrole),
            session=hlid(self._session_id),
            details=details,
        )

        BridgeSession.onLeave(self, details)






[docs] class RLinkConfig(object): def __init__( self, realm, transport, authid, exclude_authid, forward_local_events, forward_remote_events, forward_local_invocations, forward_remote_invocations, ): """ :param realm: The remote router realm. :type realm: str :param transport: The transport for connecting to the remote router. :type transport: """
[docs] self.realm = realm
[docs] self.transport = transport
[docs] self.authid = authid
[docs] self.exclude_authid = exclude_authid
[docs] self.forward_local_events = forward_local_events
[docs] self.forward_remote_events = forward_remote_events
[docs] self.forward_local_invocations = forward_local_invocations
[docs] self.forward_remote_invocations = forward_remote_invocations
[docs] def __str__(self): return pprint.pformat(self.marshal())
[docs] def marshal(self): obj = { "realm": self.realm, "transport": self.transport, "authid": self.authid, "exclude_authid": self.exclude_authid, "forward_local_events": self.forward_local_events, "forward_remote_events": self.forward_remote_events, "forward_local_invocations": self.forward_local_invocations, "forward_remote_invocations": self.forward_remote_invocations, } return obj
@staticmethod
[docs] def parse(personality, obj, id=None): """ Parses a generic object (eg a dict) into a typed object of this class. :param obj: The generic object to parse. :type obj: dict :returns: Router link configuration :rtype: :class:`crossbar.edge.worker.rlink.RLinkConfig` """ # assert isinstance(personality, Personality) assert isinstance(obj, dict) assert id is None or isinstance(id, str) if id: obj["id"] = id check_dict_args( { "id": (False, [str]), "realm": (True, [str]), "transport": (True, [Mapping]), "authid": (False, [str]), "exclude_authid": (False, [Sequence]), "forward_local_events": (False, [bool]), "forward_remote_events": (False, [bool]), "forward_local_invocations": (False, [bool]), "forward_remote_invocations": (False, [bool]), }, obj, "router link configuration", ) realm = obj["realm"] authid = obj.get("authid", None) exclude_authid = obj.get("exclude_authid", []) for aid in exclude_authid: assert isinstance(aid, str) forward_local_events = obj.get("forward_local_events", True) forward_remote_events = obj.get("forward_remote_events", True) forward_local_invocations = obj.get("forward_local_invocations", True) forward_remote_invocations = obj.get("forward_remote_invocations", True) transport = obj["transport"] check_realm_name(realm) check_connecting_transport(personality, transport) config = RLinkConfig( realm=realm, transport=transport, authid=authid, exclude_authid=exclude_authid, forward_local_events=forward_local_events, forward_remote_events=forward_remote_events, forward_local_invocations=forward_local_invocations, forward_remote_invocations=forward_remote_invocations, ) return config
[docs] class RLinkManager(object): """ Router-to-router links manager. """
[docs] log = make_logger()
def __init__(self, realm, controller): """ :param realm: The (local) router realm this object is managing links for. :param controller: The router controller this rlink is running under. """ # import here to resolve import dependency issues from crossbar.edge.worker.router import ExtRouterRealm from crossbar.worker.router import RouterController
[docs] self._realm: ExtRouterRealm = realm
[docs] self._controller: RouterController = controller
# map: link_id -> RLink @property
[docs] def realm(self): return self._realm
@property
[docs] def controller(self): return self._controller
[docs] def __getitem__(self, link_id): if link_id in self._links: return self._links[link_id] else: raise KeyError('no router link with ID "{}"'.format(link_id))
[docs] def __contains__(self, link_id): return link_id in self._links
[docs] def __len__(self): return len(self._links)
[docs] def __setitem__(self, item, value): raise Exception("__setitem__ not supported on this class")
[docs] def __delitem__(self, item): raise Exception("__delitem__ not supported on this class")
[docs] def keys(self): return self._links.keys()
@inlineCallbacks @inlineCallbacks