##############################################################################
#
# Crossbar.io
# Copyright (C) typedef int GmbH. All rights reserved.
#
##############################################################################
import six
from autobahn import wamp
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.types import PublishOptions
from twisted.internet import reactor
from txaio import make_logger
from crossbar._util import hl, hltype
from crossbar.edge.worker.tracing import FabricRouterTrace
from crossbar.router.router import Router, RouterFactory
from crossbar.worker.router import RouterController
from crossbar.worker.types import RouterRealm
__all__ = ("ExtRouterController",)
class RouterRealmInterface(object):
def __init__(self, id, config):
self.id = id
self.config = config
class RouterInterface(object):
def __init__(self, router, uri):
self.router = router
self.uri = uri
class ExtRouter(Router):
"""
Router extended with crossbar features.
"""
def __init__(self, factory, realm, options=None, store=None):
Router.__init__(self, factory, realm, options, store)
self._interfaces = {}
def has_interface(self, uri):
return uri in self._interfaces
def add_interface(self, interface):
overwritten = interface.uri in self._interfaces
self._interfaces[interface.uri] = interface
return overwritten
def drop_interface(self, interface):
if interface.uri in self._interfaces:
del self._interfaces[interface.uri]
return True
return False
class ExtRouterFactory(RouterFactory):
"""
Router factory extended with crossbar features.
"""
router = ExtRouter # type: ignore
def __init__(self, node_id, worker, options=None):
RouterFactory.__init__(self, node_id, worker, options=options)
assert isinstance(ExtRouter, Router)
self._routers: dict[str, ExtRouter] = {}
def add_interface(self, realm, interface):
assert isinstance(realm, six.text_type)
assert realm in self._routers
router_ = self._routers[realm]
router_.add_interface(RouterInterface(router_, interface["uri"]))
def drop_interface(self, realm, interface_id):
assert isinstance(realm, six.text_type)
assert isinstance(interface_id, six.text_type)
if realm not in self._routers:
raise Exception('no router started for realm "{}"'.format(realm))
router = self._routers[realm]
if interface_id not in router._interfaces:
raise Exception('no interface "{}" started on router for realm "{}"'.format(interface_id, realm))
interface_id = router._interfaces[interface_id]
router.drop_interface(interface_id)
class ExtRouterRealm(RouterRealm):
"""
Router realm run-time representation, extended with crossbar features:
1. router links
2. interfaces
"""
def __init__(self, controller, realm_id, config, router=None, session=None):
"""
:param controller:
:param realm_id: The realm ID within the router.
:type realm_id: str
:param config: The realm configuration.
:type config: dict
:param router: The router (within the router worker) serving the realm.
:type router: :class:`crossbar.edge.worker.router.ExtRouter`
:param session: The realm service session.
:type session: :class:`crossbar.router.service.RouterServiceAgent`
"""
RouterRealm.__init__(self, controller, realm_id, config, router=router, session=session)
# FIXME
self.interfaces = {}
def marshal(self):
marshalled = RouterRealm.marshal(self)
# FIXME
marshalled["interfaces"] = self.interfaces
return marshalled
[docs]
class ExtRouterController(RouterController):
"""
Controller session for crossbar router workers.
"""
def __init__(self, config=None, reactor=None, personality=None):
RouterController.__init__(self, config=config, reactor=reactor, personality=personality)
# router factory / realm classes to be used
[docs]
self.router_factory_class = ExtRouterFactory
[docs]
self.router_realm_class = ExtRouterRealm
# map: trace ID -> RouterTrace
# when users don't provide a trace_id, draw from this enumerator
@wamp.register(None)
[docs]
def start_router_realm(self, realm_id, realm_config, details=None):
self.log.info(
'Starting router realm "{realm_id}" {method}',
realm_id=realm_id,
method=hltype(ExtRouterController.start_router_realm),
)
# activate this to test:
if False and realm_config["name"] == "realm1":
self.log.info(hl("Auto-renaming realm1 to realm001", color="green", bold=True))
realm_config["name"] = "realm001"
return RouterController.start_router_realm(self, realm_id, realm_config, details)
[docs]
def _next_trace_id(self):
while True:
trace_id = "trace{}".format(self._next_trace)
self._next_trace += 1
if trace_id not in self._traces:
return trace_id
[docs]
def _maybe_trace_rx_msg(self, session, msg):
if self._traces:
for trace in self._traces.values():
trace.maybe_trace_rx_msg(session, msg)
[docs]
def _maybe_trace_tx_msg(self, session, msg):
if self._traces:
for trace in self._traces.values():
trace.maybe_trace_tx_msg(session, msg)
@wamp.register(None)
[docs]
def get_traces(self, include_stopped=False, details=None):
self.log.debug('get_traces(inclue_stopped="{include_stopped}")', include_stopped=include_stopped)
return sorted(self._traces.keys())
@wamp.register(None)
[docs]
def get_trace(self, trace_id, details=None):
self.log.debug('get_trace(trace_id="{trace_id}")', trace_id=trace_id)
if trace_id not in self._traces:
raise ApplicationError("crossbar.error.no_such_object", "No trace with ID '{}'".format(trace_id))
return self._traces[trace_id].marshal()
@wamp.register(None)
[docs]
def start_trace(self, trace_id=None, trace_options=None, details=None):
self.log.info(
'start_trace(trace_id="{trace_id}", trace_options="{trace_options}")',
trace_id=trace_id,
trace_options=trace_options,
)
assert trace_id is None or isinstance(trace_id, six.text_type)
assert trace_options is None or isinstance(trace_options, dict)
trace_id = trace_id or self._next_trace_id()
trace_options = trace_options or {}
# tracing level: u'message' (default) or u'action'
trace_level = trace_options.get("trace_level", "message")
if trace_level not in ["message", "action"]:
emsg = 'invalid tracing options: trace_level must be one of ["message", "action"], but was "{}"'.format(
trace_level
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
# flag to control tracing of app _payload_
trace_app_payload = trace_options.get("trace_app_payload", False)
if not isinstance(trace_app_payload, bool):
emsg = "invalid tracing options: trace_app_payload must be of type bool, was {}".format(
type(trace_app_payload)
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
# tracing app payload only makes sense for trace_level == u'message'
if trace_app_payload and trace_level != "message":
emsg = 'invalid tracing options: when trace_app_payload is set, trace_level must be "message", but was "{}"'.format(
trace_level
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
# parameter to control batching of trace records (in ms)
batching_period = trace_options.get("batching_period", 200)
if type(batching_period) not in six.integer_types or batching_period < 10 or batching_period > 10000:
emsg = 'invalid tracing options: batching_period must be an integer [10, 10000], was "{}"'.format(
type(batching_period)
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
# flag to control tracing persistence
persist = trace_options.get("persist", False)
if not isinstance(persist, bool):
emsg = "invalid tracing options: persist must be of type bool, was {}".format(type(persist))
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
# parameter to control trace duration (in secs): if given, automatically stop the trace
# after the given period of time. if not given, the trace runs until stopped explicitly
duration = trace_options.get("duration", None)
if duration is not None and (type(duration) not in six.integer_types or duration < 1 or duration > 86400):
emsg = 'invalid tracing options: duration must be an integer [1, 86400], was "{}"'.format(type(duration))
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
# check user provided trace_id
if trace_id in self._traces:
emsg = 'could not start trace: a trace with ID "{}" is already running (or starting)'.format(trace_id)
self.log.error(emsg)
raise ApplicationError("crossbar.error.already_running", emsg)
def on_trace_period_finished(trace_id, period, trace_batch):
if trace_level == "message":
trace_data = [trace_record.marshal(self._trace_app_payload) for trace_record in trace_batch]
elif trace_level == "action":
trace_data = [traced_action.marshal() for traced_action in trace_batch]
else:
raise Exception("logic error")
self.publish("{}.on_trace_data".format(self._uri_prefix), trace_id, period, trace_data)
trace = FabricRouterTrace(
self,
trace_id,
on_trace_period_finished=on_trace_period_finished,
trace_level=trace_level,
trace_app_payload=trace_app_payload,
batching_period=batching_period,
persist=persist,
duration=duration,
)
trace.start()
self._traces[trace_id] = trace
if duration:
def maybe_stop():
if trace_id in self._traces and self._traces[trace_id]._status == "running":
self.stop_trace(trace_id, details=details)
reactor.callLater(float(duration), maybe_stop)
trace_started = trace.marshal()
self.publish("{}.on_trace_started".format(self._uri_prefix), trace_id, trace_started)
return trace_started
@wamp.register(None)
[docs]
def stop_trace(self, trace_id, details=None):
self.log.info('stop_trace(trace_id="{trace_id}")', trace_id=trace_id)
if trace_id not in self._traces:
raise ApplicationError("crossbar.error.no_such_object", "No trace with ID '{}'".format(trace_id))
trace = self._traces[trace_id]
trace.stop()
del self._traces[trace_id]
trace_stopped = trace.marshal()
self.publish("{}.on_trace_stopped".format(self._uri_prefix), trace_id, trace_stopped)
return trace_stopped
@wamp.register(None)
[docs]
def get_trace_data(self, trace_id, from_seq, to_seq=None, limit=None, details=None):
self.log.debug(
'get_trace_data(trace_id="{trace_id}", from_seq="{from_seq})", to_seq="{to_seq}", limit="{limit}")',
trace_id=trace_id,
from_seq=from_seq,
to_seq=to_seq,
limit=limit,
)
if trace_id not in self._traces:
raise ApplicationError("crossbar.error.no_such_object", "No trace with ID '{}'".format(trace_id))
limit = limit or 100
if limit > 10000:
raise Exception("limit too large")
return self._traces[trace_id].get_data(from_seq, to_seq, limit)
@wamp.register(None)
[docs]
def start_router_realm_interface(self, realm_id, interface_id, interface_config, details=None):
if realm_id not in self.realms:
raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id))
if interface_id in self.realms[realm_id].interfaces:
raise ApplicationError(
"crossbar.error.already_exists",
"An interface with ID '{}' already exists in realm with ID '{}'".format(interface_id, realm_id),
)
self.realms[realm_id].interfaces[interface_id] = RouterRealmInterface(interface_id, interface_config)
realm = self.realms[realm_id].config["name"]
self._router_factory.add_interface(realm, interface_config)
topic = "{}.on_router_realm_interface_started".format(self._uri_prefix)
event = {"id": interface_id}
caller = details.caller if details else None
self.publish(topic, event, options=PublishOptions(exclude=caller))
@wamp.register(None)
[docs]
def stop_router_realm_interface(self, realm_id, interface_id, details=None):
if realm_id not in self.realms:
raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id))
if interface_id not in self.realms[realm_id].interfaces:
raise ApplicationError(
"crossbar.error.no_such_object",
"No interface with ID '{}' in realm with ID '{}'".format(interface_id, realm_id),
)
del self.realms[realm_id].interfaces[interface_id]
@wamp.register(None)
[docs]
def get_router_realm_interface(self, realm_id, details=None):
if realm_id not in self.realms:
raise ApplicationError("crossbar.error.no_such_object", "No realm with ID '{}'".format(realm_id))
return self.realms[realm_id].interfaces.values()