#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
from datetime import datetime
import twisted
from autobahn import wamp
from autobahn.util import utcstr
from autobahn.wamp import ApplicationError, PublishOptions
from twisted.internet.defer import inlineCallbacks, maybeDeferred, returnValue
from txaio import make_logger
import crossbar
from crossbar._util import hlid, hltype, hlval
from crossbar.bridge.mqtt.wamp import WampMQTTServerFactory
from crossbar.common.twisted.endpoint import create_listening_port_from_config
from crossbar.common.twisted.web import Site
from crossbar.router.protocol import WampRawSocketServerFactory, WampWebSocketServerFactory
from crossbar.router.unisocket import UniSocketServerFactory
from crossbar.webservice.flashpolicy import FlashPolicyFactory
from crossbar.worker.controller import WorkerController
from crossbar.worker.testee import StreamTesteeServerFactory, WebSocketTesteeServerFactory
# monkey patch the Twisted Web server identification
twisted.web.server.version = "Crossbar/{}".format(crossbar.__version__)
[docs]
class RouterTransport(object):
"""
A (listening) transport running on a router worker.
"""
[docs]
STATES = {
STATE_CREATED: "created",
STATE_STARTING: "starting",
STATE_STARTED: "started",
STATE_FAILED: "failed",
STATE_STOPPING: "stopping",
STATE_STOPPED: "stopped",
}
def __init__(self, worker, transport_id, config):
"""
:param worker: The (router) worker session the transport is created from.
:type worker: crossbar.worker.router.RouterController
:param transport_id: The transport ID within the router.
:type transport_id: str
:param config: The transport's configuration.
:type config: dict
"""
[docs]
self._transport_id = transport_id
try:
self._worker.personality.check_router_transport(self._worker.personality, config)
except Exception as e:
emsg = "Invalid router transport configuration: {}".format(e)
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
else:
self.log.debug(
"Router transport parsed successfully (transport_id={transport_id}, transport_type={transport_type})",
transport_id=transport_id,
transport_type=config["type"],
)
[docs]
self._type = config["type"]
[docs]
self._cbdir = self._worker.config.extra.cbdir
[docs]
self._templates = self._worker.templates()
[docs]
self._created_at = datetime.utcnow()
[docs]
self._listening_since = None
[docs]
self._state = RouterTransport.STATE_CREATED
[docs]
self._transport_factory = None
[docs]
self._root_webservice = None
# twisted.internet.interfaces.IListeningPort
[docs]
def marshal(self):
return {
"id": self._transport_id,
"type": self._type,
"config": self._config,
"created_at": utcstr(self._created_at),
"listening_since": utcstr(self._listening_since) if self._listening_since else None,
"state": self._state,
}
@property
[docs]
def root(self):
"""
:return: The root (on path "/") Web service.
"""
return self._root_webservice
@property
[docs]
def worker(self):
"""
:return: The worker (controller session) this transport was created from.
"""
return self._worker
@property
[docs]
def id(self):
"""
:return: The transport ID.
"""
return self._transport_id
@property
[docs]
def type(self):
"""
:return: The transport type.
"""
return self._type
@property
[docs]
def cbdir(self):
"""
:return: Node directory.
"""
return self._cbdir
@property
[docs]
def templates(self):
"""
:return: Templates directory.
"""
return self._templates
@property
[docs]
def config(self):
"""
:return: The original configuration as supplied to this router transport.
"""
return self._config
@property
[docs]
def created(self):
"""
:return: When this transport was created (the run-time, in-memory object instantiated).
"""
return self._created_at
@property
[docs]
def state(self):
"""
:return: The state of this transport.
"""
return self._state
@property
[docs]
def port(self):
"""
:return: The network listening transport of this router transport.
"""
return self._port
@inlineCallbacks
[docs]
def start(self, start_children=False, ignore=[]):
"""
Start this transport (starts listening on the respective network listening port).
:param start_children:
:return:
"""
if self._state != RouterTransport.STATE_CREATED:
raise Exception("invalid state")
# note that we are starting ..
self._state = RouterTransport.STATE_STARTING
# create transport factory
self._transport_factory, self._root_webservice = yield self._create_factory(start_children, ignore)
# create transport endpoint / listening port from transport factory
#
port = yield create_listening_port_from_config(
self._config["endpoint"],
self._cbdir,
self._transport_factory,
self._worker._reactor,
self.log,
)
# when listening:
self._port = port
self._listening_since = datetime.utcnow()
# note that we started.
self._state = RouterTransport.STATE_STARTED
returnValue(self)
[docs]
def _create_web_factory(self, create_paths=False, ignore=[]):
raise NotImplementedError("_create_web_factory")
@inlineCallbacks
[docs]
def _create_factory(self, create_paths=False, ignore=[]):
# Twisted (listening endpoint) transport factory
transport_factory = None
# Root Web service: only set (down below) when running a Web transport or
# a Universal transport with Web support
root_webservice = None
# standalone WAMP-RawSocket transport
#
if self._config["type"] == "rawsocket":
transport_factory = WampRawSocketServerFactory(self._worker._router_session_factory, self._config)
transport_factory.noisy = False
# standalone WAMP-WebSocket transport
#
elif self._config["type"] == "websocket":
assert self._templates
transport_factory = WampWebSocketServerFactory(
self._worker._router_session_factory, self._cbdir, self._config, self._templates
)
transport_factory.noisy = False
# Flash-policy file server pseudo transport
#
elif self._config["type"] == "flashpolicy":
transport_factory = FlashPolicyFactory(
self._config.get("allowed_domain", None), self._config.get("allowed_ports", None)
)
# WebSocket testee pseudo transport
#
elif self._config["type"] == "websocket.testee":
assert self._templates
transport_factory = WebSocketTesteeServerFactory(self._config, self._templates)
# Stream testee pseudo transport
#
elif self._config["type"] == "stream.testee":
transport_factory = StreamTesteeServerFactory()
# MQTT legacy adapter transport
#
elif self._config["type"] == "mqtt":
transport_factory = WampMQTTServerFactory(
self._worker._router_session_factory, self._config, self._worker._reactor
)
transport_factory.noisy = False
# Twisted Web based transport
#
elif self._config["type"] == "web":
assert self._templates
transport_factory, root_webservice = yield maybeDeferred(self._create_web_factory, create_paths, ignore)
# Universal transport
#
elif self._config["type"] == "universal":
if "web" in self._config:
assert self._templates
web_factory, root_webservice = yield maybeDeferred(self._create_web_factory, create_paths, ignore)
else:
web_factory, root_webservice = None, None
if "rawsocket" in self._config:
rawsocket_factory = WampRawSocketServerFactory(
self._worker._router_session_factory, self._config["rawsocket"]
)
rawsocket_factory.noisy = False
else:
rawsocket_factory = None
if "mqtt" in self._config:
mqtt_factory = WampMQTTServerFactory(
self._worker._router_session_factory, self._config["mqtt"], self._worker._reactor
)
mqtt_factory.noisy = False
else:
mqtt_factory = None
if "websocket" in self._config:
assert self._templates
websocket_factory_map = {}
for websocket_url_first_component, websocket_config in self._config["websocket"].items():
websocket_transport_factory = WampWebSocketServerFactory(
self._worker._router_session_factory, self._cbdir, websocket_config, self._templates
)
websocket_transport_factory.noisy = False
websocket_factory_map[websocket_url_first_component] = websocket_transport_factory
self.log.debug(
"hooked up websocket factory on request URI {request_uri}",
request_uri=websocket_url_first_component,
)
else:
websocket_factory_map = None
transport_factory = UniSocketServerFactory(
web_factory, websocket_factory_map, rawsocket_factory, mqtt_factory
)
# this is to allow subclasses to reuse this method
elif self._config["type"] in ignore:
pass
# unknown transport type
else:
# should not arrive here, since we did check_transport() in the beginning
raise Exception("logic error")
returnValue((transport_factory, root_webservice))
[docs]
def stop(self):
"""
Stops this transport (stops listening on the respective network port or interface).
:return:
"""
if self._state != RouterTransport.STATE_STARTED:
raise Exception("invalid state")
self._state = RouterTransport.STATE_STOPPING
d = self._port.stopListening()
def ok(_):
self._state = RouterTransport.STATE_STOPPED
self._port = None
def fail(err):
self._state = RouterTransport.STATE_FAILED
self._port = None
raise err
d.addCallbacks(ok, fail)
return d
[docs]
class RouterWebTransport(RouterTransport):
"""
Web transport or Universal transport with Web sub-service.
"""
def __init__(self, worker, transport_id, config):
RouterTransport.__init__(self, worker, transport_id, config)
@inlineCallbacks
[docs]
def _create_web_factory(self, create_paths=False, ignore=[]):
# web transport options
options = self._config.get("options", {})
# create root web service
if "/" in self._config.get("paths", []):
root_config = self._config["paths"]["/"]
elif "/" in self._config.get("web", {}).get("paths", {}):
root_config = self._config["web"]["paths"]["/"]
else:
root_config = {"type": "path", "paths": {}}
root_factory = self._worker.personality.WEB_SERVICE_FACTORIES[root_config["type"]]
if not root_factory:
raise Exception('internal error: missing web service factory for type "{}"'.format(root_config["type"]))
root_webservice = yield maybeDeferred(root_factory.create, self, "/", root_config)
self.log.info(
'Created "{root_type}" Web service on root path "/" of Web transport "{transport_id}"',
root_type=root_config["type"],
transport_id=self.id,
)
# create the actual transport factory
transport_factory = Site(
root_webservice._resource,
client_timeout=options.get("client_timeout", None),
access_log=options.get("access_log", False),
display_tracebacks=options.get("display_tracebacks", False),
hsts=options.get("hsts", False),
hsts_max_age=int(options.get("hsts_max_age", 31536000)),
)
returnValue((transport_factory, root_webservice))
[docs]
def create_router_transport(worker, transport_id, config):
"""
Factory for creating router (listening) transports.
:param worker:
:param transport_id:
:param config:
:return:
"""
worker.log.info('Creating router transport for "{transport_id}" ..', transport_id=transport_id)
if config["type"] == "web" or (config["type"] == "universal" and config.get("web", {})):
transport = RouterWebTransport(worker, transport_id, config)
else:
transport = RouterTransport(worker, transport_id, config)
worker.log.info(
'Router transport created for "{transport_id}" [transport_class={transport_class}]',
transport_id=transport_id,
transport_class=hltype(transport.__class__),
)
return transport
[docs]
class TransportController(WorkerController):
"""
Services shared between RouterController and ProxyController
"""
def __init__(self, config=None, reactor=None, personality=None):
super(TransportController, self).__init__(
config=config,
reactor=reactor,
personality=personality,
)
# map: transport ID -> RouterTransport
@wamp.register(None)
@inlineCallbacks
[docs]
def start_web_transport_service(self, transport_id, path, config, details=None):
"""
Start a service on a Web transport.
:param transport_id: The ID of the transport to start the Web transport service on.
:type transport_id: str
:param path: The path (absolute URL, eg "/myservice1") on which to start the service.
:type path: str
:param config: The Web service configuration.
:type config: dict
:param details: Call details.
:type details: :class:`autobahn.wamp.types.CallDetails`
"""
if not isinstance(config, dict) or "type" not in config:
raise ApplicationError("crossbar.invalid_argument", "config parameter must be dict with type attribute")
self.log.info(
'Starting "{service_type}" Web service on path "{path}" of transport "{transport_id}" {func}',
service_type=hlval(config.get("type", "unknown")),
path=hlval(path),
transport_id=hlid(transport_id),
func=hltype(self.start_web_transport_service),
)
transport = self.transports.get(transport_id, None)
if not transport:
emsg = 'Cannot start service on transport: no transport with ID "{}"'.format(transport_id)
self.log.error(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
if not isinstance(transport, self.personality.RouterWebTransport):
emsg = "Cannot start service on transport: transport is not a Web transport (transport_type={})".format(
hltype(transport.__class__)
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
if transport.state != self.personality.RouterTransport.STATE_STARTED:
emsg = "Cannot start service on Web transport service: transport {} is not running (transport_state={})".format(
transport_id, self.personality.RouterWebTransport.STATES.get(transport.state, None)
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
if path in transport.root:
emsg = 'Cannot start service on Web transport "{}": a service is already running on path "{}"'.format(
transport_id, path
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.already_running", emsg)
caller = details.caller if details else None
self.publish(
self._uri_prefix + ".on_web_transport_service_starting",
transport_id,
path,
options=PublishOptions(exclude=caller),
)
# now actually add the web service ..
# note: currently this is NOT async, but direct/sync.
webservice_factory = self.personality.WEB_SERVICE_FACTORIES[config["type"]]
webservice = yield maybeDeferred(webservice_factory.create, transport, path, config)
transport.root[path] = webservice
on_web_transport_service_started = {"transport_id": transport_id, "path": path, "config": config}
caller = details.caller if details else None
self.publish(
self._uri_prefix + ".on_web_transport_service_started",
transport_id,
path,
on_web_transport_service_started,
options=PublishOptions(exclude=caller),
)
returnValue(on_web_transport_service_started)
@wamp.register(None)
[docs]
def stop_web_transport_service(self, transport_id, path, details=None):
"""
Stop a service on a Web transport.
:param transport_id: The ID of the transport to stop the Web transport service on.
:type transport_id: str
:param path: The path (absolute URL, eg "/myservice1") of the service to stop.
:type path: str
:param details: Call details.
:type details: :class:`autobahn.wamp.types.CallDetails`
"""
self.log.info(
'{func}(transport_id={transport_id}, path="{path}")',
func=hltype(self.stop_web_transport_service),
transport_id=hlid(transport_id),
path=hlval(path),
)
transport = self.transports.get(transport_id, None)
if (
not transport
or not isinstance(transport, self.personality.RouterWebTransport)
or transport.state != self.personality.RouterTransport.STATE_STARTED
):
emsg = "Cannot stop service on Web transport: no transport with ID '{}' or transport is not a Web transport".format(
transport_id
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
if path not in transport.root:
emsg = "Cannot stop service on Web transport {}: no service running on path '{}'".format(
transport_id, path
)
self.log.error(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
caller = details.caller if details else None
self.publish(
self._uri_prefix + ".on_web_transport_service_stopping",
transport_id,
path,
options=PublishOptions(exclude=caller),
)
# now actually remove the web service. note: currently this is NOT async, but direct/sync.
# FIXME: check that the underlying Twisted Web resource doesn't need any stopping too!
del transport.root[path]
on_web_transport_service_stopped = {
"transport_id": transport_id,
"path": path,
}
caller = details.caller if details else None
self.publish(
self._uri_prefix + ".on_web_transport_service_stopped",
transport_id,
path,
on_web_transport_service_stopped,
options=PublishOptions(exclude=caller),
)
return on_web_transport_service_stopped
@wamp.register(None)
[docs]
def get_web_transport_service(self, transport_id, path, details=None):
self.log.debug(
'{func}(transport_id={transport_id}, path="{path}")',
func=hltype(self.get_web_transport_service),
transport_id=hlid(transport_id),
path=hlval(path),
)
transport = self.transports.get(transport_id, None)
if (
not transport
or not isinstance(transport, self.personality.RouterWebTransport)
or transport.state != self.personality.RouterTransport.STATE_STARTED
):
emsg = "No transport with ID '{}' or transport is not a Web transport".format(transport_id)
self.log.debug(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
if path not in transport.root:
emsg = "Web transport {}: no service running on path '{}'".format(transport_id, path)
self.log.debug(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
return transport.marshal()
@wamp.register(None)
[docs]
def get_web_transport_services(self, transport_id, details=None):
self.log.debug(
"{func}(transport_id={transport_id})",
func=hltype(self.get_web_transport_services),
transport_id=hlid(transport_id),
)
transport = self.transports.get(transport_id, None)
if (
not transport
or not isinstance(transport, self.personality.RouterWebTransport)
or transport.state != self.personality.RouterTransport.STATE_STARTED
):
emsg = "No transport with ID '{}' or transport is not a Web transport".format(transport_id)
self.log.debug(emsg)
raise ApplicationError("crossbar.error.not_running", emsg)
return sorted(transport._config.get("paths", []))