#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import copy
import pprint
from collections.abc import Mapping, Sequence
from typing import Dict
from autobahn import util
from autobahn.twisted.wamp import ApplicationRunner, ApplicationSession
from autobahn.util import hl, hlid, hltype, hluserid, hlval
from autobahn.wamp.exception import ApplicationError, TransportLost
from autobahn.wamp.message import Event, Invocation
from autobahn.wamp.types import (
CallOptions,
ComponentConfig,
PublishOptions,
RegisterOptions,
SessionIdent,
SubscribeOptions,
)
from twisted.internet.defer import Deferred, inlineCallbacks
from txaio import make_logger, time_ns
from crossbar.common.checkconfig import check_connecting_transport, check_dict_args, check_realm_name
from crossbar.common.twisted.endpoint import create_connecting_endpoint_from_config
__all__ = (
"RLink",
"RLinkConfig",
"RLinkManager",
)
class BridgeSession(ApplicationSession):
log = make_logger()
def __init__(self, config):
ApplicationSession.__init__(self, config)
self._subs = {}
# registration-id's of remote registrations from an rlink
self._regs = {}
self._exclude_authid = None
self._exclude_authrole = None
def onMessage(self, msg):
if msg._router_internal is not None:
if isinstance(msg, Event):
msg.publisher, msg.publisher_authid, msg.publisher_authrole = msg._router_internal
elif isinstance(msg, Invocation):
msg.caller, msg.caller_authid, msg.caller_authrole = msg._router_internal
return super(BridgeSession, self).onMessage(msg)
@inlineCallbacks
def _setup_event_forwarding(self, other):
self.log.debug(
"setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole})",
exclude_authid=self._exclude_authid,
exclude_authrole=self._exclude_authrole,
me=self._session_id,
other=other,
)
@inlineCallbacks
def on_subscription_create(sub_session, sub_details, details=None):
"""
Event handler fired when a new subscription was created on this router.
The handler will then also subscribe on the other router, and when receiving
events, re-publish those on this router.
:param sub_session:
:param sub_details:
:param details:
:return:
"""
if sub_details["uri"].startswith("wamp."):
return
sub_id = sub_details["id"]
if sub_id in self._subs and self._subs[sub_id]["sub"]:
# This will happen if, partway through the subscription process, the RLink disconnects
self.log.error(
"on_subscription_create: sub ID {sub_id} already in map {method}",
sub_id=sub_id,
method=hltype(BridgeSession._setup_event_forwarding),
)
return
sub_details_local = copy.deepcopy(sub_details)
if sub_id not in self._subs:
sub_details_local["sub"] = None
self._subs[sub_id] = sub_details_local
uri = sub_details["uri"]
ERR_MSG = [None]
@inlineCallbacks
def on_event(*args, **kwargs):
assert "details" in kwargs
details = kwargs.pop("details")
options = kwargs.pop("options", None)
self.log.debug(
"Received event on uri={uri}, options={options} (publisher={publisher}, publisher_authid={publisher_authid}, publisher_authrole={publisher_authrole}, forward_for={forward_for})",
uri=uri,
options=options,
publisher=details.publisher,
publisher_authid=details.publisher_authid,
publisher_authrole=details.publisher_authrole,
forward_for=details.forward_for,
)
assert details.publisher is not None
this_forward = {
"session": details.publisher,
"authid": details.publisher_authid,
"authrole": details.publisher_authrole,
}
if details.forward_for:
# the event comes already forwarded from a router node
if len(details.forward_for) >= 0:
self.log.debug("SKIP! already forwarded")
return
forward_for = copy.deepcopy(details.forward_for)
forward_for.append(this_forward)
else:
forward_for = [this_forward]
options = PublishOptions(
acknowledge=True,
exclude_me=True,
exclude_authid=self._exclude_authid,
exclude_authrole=self._exclude_authrole,
forward_for=forward_for,
)
try:
yield self.publish(uri, *args, options=options, **kwargs)
except TransportLost:
return
except ApplicationError as e:
if e.error not in ["wamp.close.normal"]:
self.log.warn("FAILED TO PUBLISH 1: {} {}".format(type(e), str(e)))
return
except Exception as e:
if not ERR_MSG[0]:
self.log.warn("FAILED TO PUBLISH 2: {} {}".format(type(e), str(e)))
ERR_MSG[0] = True
return
self.log.debug(
"RLink forward-published event {dir} (options={options})",
dir=self.DIR,
options=options,
)
try:
sub = yield other.subscribe(on_event, uri, options=SubscribeOptions(details=True))
except TransportLost:
self.log.debug(
"on_subscription_create: could not forward-subscription '{}' as RLink is not connected".format(uri)
)
return
if sub_id not in self._subs:
self.log.info("subscription already gone: {uri}", uri=sub_details["uri"])
yield sub.unsubscribe()
else:
self._subs[sub_id]["sub"] = sub
self.log.debug(
"created forwarding subscription: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session}",
me=self._session_id,
other=other,
sub_id=sub_id,
sub_details=sub_details,
details=details,
sub_session=sub_session,
)
# listen to when a subscription is removed from the router
#
@inlineCallbacks
def on_subscription_delete(session_id, sub_id, details=None):
self.log.debug(
"Subscription deleted: {me} {session} {sub_id} {details}",
me=self,
session=session_id,
sub_id=sub_id,
details=details,
)
sub_details = self._subs.get(sub_id, None)
if not sub_details:
self.log.debug("subscription not tracked - huh??")
return
uri = sub_details["uri"]
sub = self._subs[sub_id]["sub"]
if sub is None:
# see above; we might have un-subscribed here before
# we got an answer from the other router
self.log.info("subscription has no 'sub'")
else:
yield sub.unsubscribe()
del self._subs[sub_id]
self.log.debug("{other} unsubscribed from {uri}".format(other=other, uri=uri))
@inlineCallbacks
def forward_current_subs():
# get current subscriptions on the router
subs = yield self.call("wamp.subscription.list")
for sub_id in subs["exact"]:
sub = yield self.call("wamp.subscription.get", sub_id)
assert sub["id"] == sub_id, "Logic error, subscription IDs don't match"
yield on_subscription_create(self._session_id, sub)
@inlineCallbacks
def on_remote_join(_session, _details):
yield forward_current_subs()
def on_remote_leave(_session, _details):
# The remote session has ended, clear subscription records.
# Clearing this dictionary helps avoid the case where
# local procedures are not subscribed on the remote leg
# on reestablishment of remote session.
# See: https://github.com/crossbario/crossbar/issues/1909
self._subs = {}
if self.IS_REMOTE_LEG:
yield forward_current_subs()
else:
# from the local leg, don't try to forward events on the
# remote leg unless the remote session is established.
other.on("join", on_remote_join)
other.on("leave", on_remote_leave)
# listen to when new subscriptions are created on the local router
yield self.subscribe(
on_subscription_create, "wamp.subscription.on_create", options=SubscribeOptions(details_arg="details")
)
yield self.subscribe(
on_subscription_delete, "wamp.subscription.on_delete", options=SubscribeOptions(details_arg="details")
)
self.log.debug("{me}: event forwarding setup done", me=self)
@inlineCallbacks
def _setup_invocation_forwarding(self, other: ApplicationSession):
self.log.info(
"setup invocation forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole})",
exclude_authid=self._exclude_authid,
exclude_authrole=self._exclude_authrole,
me=self,
other=other,
)
# called when a registration is created on the local router
@inlineCallbacks
def on_registration_create(reg_session, reg_details, details=None):
"""
Event handler fired when a new registration was created on this router.
The handler will then also register on the other router, and when receiving
calls, re-issue those on this router.
:param reg_session:
:param reg_details:
:param details:
:return:
"""
if reg_details["uri"].startswith("wamp."):
return
reg_id = reg_details["id"]
if reg_id in self._regs and self._regs[reg_id]["reg"]:
# This will happen if, partway through the registration process, the RLink disconnects
self.log.error(
"on_registration_create: reg ID {reg_id} already in map {method}",
reg_id=reg_id,
method=hltype(BridgeSession._setup_invocation_forwarding),
)
return
reg_details_local = copy.deepcopy(reg_details)
if reg_id not in self._regs:
reg_details_local["reg"] = None
self._regs[reg_id] = reg_details_local
uri = reg_details["uri"]
ERR_MSG = [None]
@inlineCallbacks
def on_call(*args, **kwargs):
assert "details" in kwargs
details = kwargs.pop("details")
options = kwargs.pop("options", None)
if details.caller is None or details.caller_authrole is None or details.caller_authid is None:
raise RuntimeError("Internal error attempting rlink forwarding")
self.log.info(
"Received invocation on uri={uri}, options={options} (caller={caller}, caller_authid={caller_authid}, caller_authrole={caller_authrole}, forward_for={forward_for})",
uri=uri,
options=options,
caller=details.caller,
caller_authid=details.caller_authid,
caller_authrole=details.caller_authrole,
forward_for=details.forward_for,
)
this_forward = {
"session": details.caller,
"authid": details.caller_authrole,
"authrole": details.caller_authrole,
}
if details.forward_for:
# the call comes already forwarded from a router node ..
if len(details.forward_for) >= 0:
self.log.debug("SKIP! already forwarded")
return
forward_for = copy.deepcopy(details.forward_for)
forward_for.append(this_forward)
else:
forward_for = [this_forward]
options = CallOptions(forward_for=forward_for)
try:
result = yield self.call(uri, *args, options=options, **kwargs)
except TransportLost:
return
except ApplicationError as e:
if e.error not in ["wamp.close.normal"]:
self.log.warn("FAILED TO CALL 1: {} {}".format(type(e), str(e)))
return
except Exception as e:
if not ERR_MSG[0]:
self.log.warn("FAILED TO CALL 2: {} {}".format(type(e), str(e)))
ERR_MSG[0] = True
return
self.log.info(
"RLink forward-invoked call {dir} (options={options})",
dir=self.DIR,
options=options,
)
return result
try:
reg = yield other.register(
on_call,
uri,
options=RegisterOptions(
details_arg="details",
invoke=reg_details.get("invoke", None),
),
)
except TransportLost:
self.log.debug(
"on_registration_create: could not forward-register '{}' as RLink is not connected".format(uri)
)
return
except Exception as e:
# FIXME: partially fixes https://github.com/crossbario/crossbar/issues/1894,
# however we need to make sure this situation never happens.
if isinstance(e, ApplicationError) and e.error == "wamp.error.procedure_already_exists":
other_leg = "local" if self.IS_REMOTE_LEG else "remote"
self.log.debug(
f"on_registration_create: tried to register procedure {uri} on {other_leg} "
f"session but it's already registered."
)
return
raise Exception("fatal: could not forward-register '{}'".format(uri))
# so ... if, during that "yield" above while we register
# on the "other" router, *this* router may have already
# un-registered. If that happened, our registration will
# be gone, so we immediately un-register on the other side
if reg_id not in self._regs:
self.log.info("registration already gone: {uri}", uri=reg_details["uri"])
yield reg.unregister()
else:
self._regs[reg_id]["reg"] = reg
self.log.debug(
"created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}",
me=self._session_id,
other=other._session_id,
reg_id=reg_id,
reg_details=reg_details,
details=details,
reg_session=reg_session,
)
# called when a registration is removed from the local router
@inlineCallbacks
def on_registration_delete(session_id, reg_id, details=None):
self.log.debug(
"Registration deleted: {me} {session} {reg_id} {details}",
me=self,
session=session_id,
reg_id=reg_id,
details=details,
)
reg_details = self._regs.get(reg_id, None)
if not reg_details:
self.log.debug("registration not tracked - huh??")
return
uri = reg_details["uri"]
reg = self._regs[reg_id]["reg"]
if reg is None:
# see above; we might have un-registered here before
# we got an answer from the other router
self.log.debug("registration has no 'reg'")
else:
yield reg.unregister()
del self._regs[reg_id]
self.log.debug("{other} unregistered from {uri}".format(other=other, uri=uri))
@inlineCallbacks
def register_current():
# get current registrations on the router
regs = yield self.call("wamp.registration.list")
for reg_id in regs["exact"]:
reg = yield self.call("wamp.registration.get", reg_id)
assert reg["id"] == reg_id, "Logic error, registration IDs don't match"
yield on_registration_create(self._session_id, reg)
@inlineCallbacks
def on_remote_join(_session, _details):
yield register_current()
def on_remote_leave(_session, _details):
# The remote session has ended, clear registration records.
# Clearing this dictionary helps avoid the case where
# local procedures are not registered on the remote leg
# on reestablishment of remote session.
# See: https://github.com/crossbario/crossbar/issues/1909
self._regs = {}
if self.IS_REMOTE_LEG:
yield register_current()
else:
# from the local leg, don't try to register procedures on the
# remote leg unless the remote session is established.
# This avoids issues where in-router components register procedures
# on startup and when the rlink is setup, the local leg tries to
# register procedures on the remote leg, even though the connection
# hasn't established.
# See: https://github.com/crossbario/crossbar/issues/1895
other.on("join", on_remote_join)
other.on("leave", on_remote_leave)
# listen to when new registrations are created on the local router
yield self.subscribe(
on_registration_create, "wamp.registration.on_create", options=SubscribeOptions(details_arg="details")
)
# listen to when a registration is removed from the local router
yield self.subscribe(
on_registration_delete, "wamp.registration.on_delete", options=SubscribeOptions(details_arg="details")
)
self.log.info("{me}: call forwarding setup done", me=self._session_id)
class RLinkLocalSession(BridgeSession):
"""
This session is the local leg of the router-to-router link and runs embedded inside the local router.
"""
log = make_logger()
IS_REMOTE_LEG = False
# direction in which events are flowing (published) via this session
DIR = hl("from remote to local", color="yellow", bold=True)
def onConnect(self):
self.log.info("{klass}.onConnect()", klass=self.__class__.__name__)
# _BridgeSession.onConnect(self)
authextra = {"rlink": self.config.extra["rlink"]}
self.join(self.config.realm, authid=self.config.extra["rlink"], authextra=authextra)
self._tracker = self.config.extra["tracker"]
@inlineCallbacks
def onJoin(self, details):
assert self.config.extra and "on_ready" in self.config.extra
assert self.config.extra and "other" in self.config.extra
remote = self.config.extra["other"]
assert isinstance(remote, RLinkRemoteSession)
self._exclude_authid = self.config.extra.get("exclude_authid", None)
self._exclude_authrole = self.config.extra.get("exclude_authrole", None)
# setup local->remote event forwarding
forward_events = self.config.extra.get("forward_events", False)
if forward_events:
yield self._setup_event_forwarding(remote)
# setup local->remote invocation forwarding
forward_invocations = self.config.extra.get("forward_invocations", False)
if forward_invocations:
yield self._setup_invocation_forwarding(remote)
self.log.debug(
"Router link local session ready (forward_events={forward_events}, forward_invocations={forward_invocations}, realm={realm}, authid={authid}, authrole={authrole}, session={session}) {method}",
method=hltype(RLinkLocalSession.onJoin),
forward_events=hluserid(forward_events),
forward_invocations=hluserid(forward_invocations),
realm=hluserid(details.realm),
authid=hluserid(details.authid),
authrole=hluserid(details.authrole),
session=hlid(details.session),
)
on_ready = self.config.extra.get("on_ready", None)
if on_ready and not on_ready.called:
self.config.extra["on_ready"].callback(self)
def onLeave(self, details):
self.log.warn(
"Router link local session down! (realm={realm}, authid={authid}, authrole={authrole}, session={session}, details={details}) {method}",
method=hltype(RLinkLocalSession.onLeave),
realm=hluserid(self.config.realm),
authid=hluserid(self._authid),
authrole=hluserid(self._authrole),
details=details,
session=hlid(self._session_id),
)
BridgeSession.onLeave(self, details)
class RLinkRemoteSession(BridgeSession):
"""
This session is the remote leg of the router-to-router link.
"""
log = make_logger()
IS_REMOTE_LEG = True
# directory in which events are flowing (published via this session
DIR = hl("from local to remote", color="yellow", bold=True)
def __init__(self, config):
BridgeSession.__init__(self, config)
# import here to resolve import dependency issues
from crossbar.worker.router import RouterController
self._subs = {}
self._rlink_manager: RLinkManager = self.config.extra["rlink_manager"]
self._router_controller: RouterController = self._rlink_manager.controller
# FIXME: async? see below
def onConnect(self):
self.log.info("{func}() ...", func=hltype(self.onConnect))
authid = self.config.extra.get("authid", None)
authrole = self.config.extra.get("authrole", None)
authextra = self.config.extra.get("authextra", {})
# FIXME: use cryptosign-proxy
authmethods = ["cryptosign"]
# use WorkerController.get_public_key to call node controller
# FIXME: the following does _not_ work with onConnect (?!)
# _public_key = await self._router_controller.get_public_key()
def actually_join(_public_key):
authextra.update(
{
# forward the client pubkey: this allows us to omit authid as
# the router can identify us with the pubkey already
"pubkey": _public_key,
# not yet implemented. a public key the router should provide
# a trustchain for its public key. the trustroot can eg be
# hard-coded in the client, or come from a command line option.
"trustroot": None,
# not yet implemented. for authenticating the router, this
# challenge will need to be signed by the router and send back
# in AUTHENTICATE for client to verify. A string with a hex
# encoded 32 bytes random value.
"challenge": None,
# https://tools.ietf.org/html/rfc5929
"channel_binding": "tls-unique",
}
)
self.log.info(
'{func} joining with realm="{realm}", authmethods={authmethods}, authid="{authid}", authrole="{authrole}", authextra={authextra}',
func=hltype(self.onConnect),
realm=hlval(self.config.realm),
authmethods=hlval(authmethods),
authid=hlval(authid),
authrole=hlval(authrole),
authextra=authextra,
)
self.join(
self.config.realm, authmethods=authmethods, authid=authid, authrole=authrole, authextra=authextra
)
res = self._rlink_manager._controller.get_public_key()
res.addCallback(actually_join)
self.log.info("{func}() done (res={res}).", func=hltype(self.onConnect), res=res)
return res
# FIXME: async? see below
def onChallenge(self, challenge):
self.log.debug("{func}(challenge={challenge})", func=hltype(self.onChallenge), challenge=challenge)
if challenge.method == "cryptosign":
# alright, we've got a challenge from the router.
# sign the challenge with our private key.
channel_id_type = "tls-unique"
channel_id_map = self._router_controller._transport.transport_details.channel_id
if channel_id_type in channel_id_map:
channel_id = channel_id_map[channel_id_type]
else:
channel_id = None
channel_id_type = None
# use WorkerController.get_public_key to call node controller
# FIXME: await?
signed_challenge = self._router_controller.sign_challenge(challenge, channel_id, channel_id_type)
# send back the signed challenge for verification
return signed_challenge
else:
raise Exception(
"internal error: we asked to authenticate using wamp-cryptosign, but now received a challenge for {}".format(
challenge.method
)
)
@inlineCallbacks
def onJoin(self, details):
self.log.debug("{klass}.onJoin(details={details})", klass=self.__class__.__name__, details=details)
assert self.config.extra and "on_ready" in self.config.extra
assert self.config.extra and "other" in self.config.extra
local = self.config.extra["other"]
assert isinstance(local, RLinkLocalSession)
local._tracker.connected = True
self._exclude_authid = self.config.extra.get("exclude_authid", None)
self._exclude_authrole = self.config.extra.get("exclude_authrole", None)
# setup remote->local event forwarding
forward_events = self.config.extra.get("forward_events", False)
if forward_events:
yield self._setup_event_forwarding(local)
# setup remote->local invocation forwarding
forward_invocations = self.config.extra.get("forward_invocations", False)
if forward_invocations:
yield self._setup_invocation_forwarding(local)
self.log.info(
"{klass}.onJoin(): rlink remote session ready (forward_events={forward_events}, forward_invocations={forward_invocations}, realm={realm}, authid={authid}, authrole={authrole}, session={session}) {method}",
klass=self.__class__.__name__,
method=hltype(RLinkRemoteSession.onJoin),
forward_events=hluserid(forward_events),
forward_invocations=hluserid(forward_invocations),
realm=hluserid(details.realm),
authid=hluserid(details.authid),
authrole=hluserid(details.authrole),
session=hlid(details.session),
)
# we are ready!
on_ready = self.config.extra.get("on_ready", None)
if on_ready and not on_ready.called:
self.config.extra["on_ready"].callback(self)
@inlineCallbacks
def onLeave(self, details):
# When the rlink is going down, make sure to unsubscribe to
# all events that are subscribed on the local-leg.
# This avoids duplicate events that would otherwise arrive
# See: https://github.com/crossbario/crossbar/issues/1916
for k, v in self._subs.items():
if v["sub"].active:
yield v["sub"].unsubscribe()
self._subs = {}
for (
k,
v,
) in self._regs.items():
if v["reg"] and v["reg"].active:
yield v["reg"].unregister()
self._regs = {}
self.config.extra["other"]._tracker.connected = False
self.log.warn(
"{klass}.onLeave(): rlink remote session left! (realm={realm}, authid={authid}, authrole={authrole}, session={session}, details={details}) {method}",
klass=self.__class__.__name__,
method=hltype(RLinkLocalSession.onLeave),
realm=hluserid(self.config.realm),
authid=hluserid(self._authid),
authrole=hluserid(self._authrole),
session=hlid(self._session_id),
details=details,
)
BridgeSession.onLeave(self, details)
[docs]
class RLink(object):
def __init__(self, id, config, started=None, started_by=None, local=None, remote=None):
assert isinstance(id, str)
assert isinstance(config, RLinkConfig)
assert started is None or isinstance(started, int)
assert started_by is None or isinstance(started_by, RLinkConfig)
assert local is None or isinstance(local, RLinkLocalSession)
assert remote is None or isinstance(remote, RLinkLocalSession)
# link ID
# link config: RLinkConfig
# when was it started: epoch time in ns
# who started this link: SessionIdent
[docs]
self.started_by = started_by
# local session: RLinkLocalSession
# remote session: RLinkRemoteSession
# updated by the session
[docs]
def __str__(self):
return pprint.pformat(self.marshal())
[docs]
def marshal(self):
obj = {
"id": self.id,
"config": self.config.marshal() if self.config else None,
"started": self.started,
"started_by": self.started_by.marshal() if self.started_by else None,
"connected": self.connected,
}
return obj
[docs]
class RLinkConfig(object):
def __init__(
self,
realm,
transport,
authid,
exclude_authid,
forward_local_events,
forward_remote_events,
forward_local_invocations,
forward_remote_invocations,
):
"""
:param realm: The remote router realm.
:type realm: str
:param transport: The transport for connecting to the remote router.
:type transport:
"""
[docs]
self.transport = transport
[docs]
self.exclude_authid = exclude_authid
[docs]
self.forward_local_events = forward_local_events
[docs]
self.forward_remote_events = forward_remote_events
[docs]
self.forward_local_invocations = forward_local_invocations
[docs]
self.forward_remote_invocations = forward_remote_invocations
[docs]
def __str__(self):
return pprint.pformat(self.marshal())
[docs]
def marshal(self):
obj = {
"realm": self.realm,
"transport": self.transport,
"authid": self.authid,
"exclude_authid": self.exclude_authid,
"forward_local_events": self.forward_local_events,
"forward_remote_events": self.forward_remote_events,
"forward_local_invocations": self.forward_local_invocations,
"forward_remote_invocations": self.forward_remote_invocations,
}
return obj
@staticmethod
[docs]
def parse(personality, obj, id=None):
"""
Parses a generic object (eg a dict) into a typed
object of this class.
:param obj: The generic object to parse.
:type obj: dict
:returns: Router link configuration
:rtype: :class:`crossbar.edge.worker.rlink.RLinkConfig`
"""
# assert isinstance(personality, Personality)
assert isinstance(obj, dict)
assert id is None or isinstance(id, str)
if id:
obj["id"] = id
check_dict_args(
{
"id": (False, [str]),
"realm": (True, [str]),
"transport": (True, [Mapping]),
"authid": (False, [str]),
"exclude_authid": (False, [Sequence]),
"forward_local_events": (False, [bool]),
"forward_remote_events": (False, [bool]),
"forward_local_invocations": (False, [bool]),
"forward_remote_invocations": (False, [bool]),
},
obj,
"router link configuration",
)
realm = obj["realm"]
authid = obj.get("authid", None)
exclude_authid = obj.get("exclude_authid", [])
for aid in exclude_authid:
assert isinstance(aid, str)
forward_local_events = obj.get("forward_local_events", True)
forward_remote_events = obj.get("forward_remote_events", True)
forward_local_invocations = obj.get("forward_local_invocations", True)
forward_remote_invocations = obj.get("forward_remote_invocations", True)
transport = obj["transport"]
check_realm_name(realm)
check_connecting_transport(personality, transport)
config = RLinkConfig(
realm=realm,
transport=transport,
authid=authid,
exclude_authid=exclude_authid,
forward_local_events=forward_local_events,
forward_remote_events=forward_remote_events,
forward_local_invocations=forward_local_invocations,
forward_remote_invocations=forward_remote_invocations,
)
return config
[docs]
class RLinkManager(object):
"""
Router-to-router links manager.
"""
def __init__(self, realm, controller):
"""
:param realm: The (local) router realm this object is managing links for.
:param controller: The router controller this rlink is running under.
"""
# import here to resolve import dependency issues
from crossbar.edge.worker.router import ExtRouterRealm
from crossbar.worker.router import RouterController
[docs]
self._realm: ExtRouterRealm = realm
[docs]
self._controller: RouterController = controller
# map: link_id -> RLink
[docs]
self._links: Dict[str, RLink] = {}
@property
[docs]
def realm(self):
return self._realm
@property
[docs]
def controller(self):
return self._controller
[docs]
def __getitem__(self, link_id):
if link_id in self._links:
return self._links[link_id]
else:
raise KeyError('no router link with ID "{}"'.format(link_id))
[docs]
def __contains__(self, link_id):
return link_id in self._links
[docs]
def __len__(self):
return len(self._links)
[docs]
def __setitem__(self, item, value):
raise Exception("__setitem__ not supported on this class")
[docs]
def __delitem__(self, item):
raise Exception("__delitem__ not supported on this class")
[docs]
def keys(self):
return self._links.keys()
@inlineCallbacks
[docs]
def start_link(self, link_id, link_config, caller):
assert isinstance(link_id, str)
assert isinstance(link_config, RLinkConfig)
assert isinstance(caller, SessionIdent)
if link_id in self._links:
raise ApplicationError("crossbar.error.already_running", "router link {} already running".format(link_id))
# setup local session
#
local_extra = {
"other": None,
"on_ready": Deferred(),
"rlink": link_id,
"forward_events": link_config.forward_local_events,
"forward_invocations": link_config.forward_local_invocations,
}
local_realm = self._realm.config["name"]
local_authid = link_config.authid or util.generate_serial_number()
local_authrole = "trusted"
local_config = ComponentConfig(local_realm, local_extra)
local_session = RLinkLocalSession(local_config)
# setup remote session
#
remote_extra = {
"rlink_manager": self,
"other": None,
"on_ready": Deferred(),
"authid": link_config.authid,
"exclude_authid": link_config.exclude_authid,
"forward_events": link_config.forward_remote_events,
"forward_invocations": link_config.forward_remote_invocations,
}
remote_realm = link_config.realm
remote_config = ComponentConfig(remote_realm, remote_extra)
remote_session = RLinkRemoteSession(remote_config)
# cross-connect the two sessions
#
local_extra["other"] = remote_session
remote_extra["other"] = local_session
# the rlink
#
rlink = RLink(link_id, link_config)
self._links[link_id] = rlink
local_extra["tracker"] = rlink
# create connecting client endpoint
#
connecting_endpoint = create_connecting_endpoint_from_config(
link_config.transport["endpoint"], self._controller.cbdir, self._controller._reactor, self.log
)
try:
# connect the local session
#
self._realm.controller._router_session_factory.add(
local_session, self._realm.router, authid=local_authid, authrole=local_authrole, authextra=local_extra
)
yield local_extra["on_ready"]
# connect the remote session
#
# remote connection parameters to ApplicationRunner:
#
# url: The WebSocket URL of the WAMP router to connect to (e.g. ws://somehost.com:8090/somepath)
# realm: The WAMP realm to join the application session to.
# extra: Optional extra configuration to forward to the application component.
# serializers: List of :class:`autobahn.wamp.interfaces.ISerializer` (or None for default serializers).
# ssl: None or :class:`twisted.internet.ssl.CertificateOptions`
# proxy: Explicit proxy server to use; a dict with ``host`` and ``port`` keys
# headers: Additional headers to send (only applies to WAMP-over-WebSocket).
# max_retries: Maximum number of reconnection attempts. Unlimited if set to -1.
# initial_retry_delay: Initial delay for reconnection attempt in seconds (Default: 1.0s).
# max_retry_delay: Maximum delay for reconnection attempts in seconds (Default: 60s).
# retry_delay_growth: The growth factor applied to the retry delay between reconnection attempts (Default 1.5).
# retry_delay_jitter: A 0-argument callable that introduces nose into the delay. (Default random.random)
#
remote_runner = ApplicationRunner(url=link_config.transport["url"], realm=remote_realm, extra=remote_extra)
yield remote_runner.run(
remote_session,
start_reactor=False,
auto_reconnect=True,
endpoint=connecting_endpoint,
reactor=self._controller._reactor,
)
yield remote_extra["on_ready"]
except:
# make sure to remove the half-initialized link from our map ..
del self._links[link_id]
# .. and then re-raise
raise
# the router link is established: store final infos
rlink.started = time_ns()
rlink.started_by = caller
rlink.local = local_session
rlink.remote = remote_session
return rlink
@inlineCallbacks
[docs]
def stop_link(self, link_id, caller):
raise NotImplementedError()