Source code for crossbar.worker.testee

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

from autobahn import wamp
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol
from autobahn.wamp.exception import ApplicationError
from twisted.internet import protocol
from twisted.internet.defer import inlineCallbacks
from txaio import make_logger

import crossbar
from crossbar.common.twisted.endpoint import create_listening_port_from_config
from crossbar.router.protocol import set_websocket_options
from crossbar.worker.controller import WorkerController

__all__ = (
    "WebSocketTesteeServerFactory",
    "StreamTesteeServerFactory",
)


class StreamTesteeServerProtocol(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)


[docs] class StreamTesteeServerFactory(protocol.Factory):
[docs] protocol = StreamTesteeServerProtocol
class WebSocketTesteeServerProtocol(WebSocketServerProtocol): log = make_logger() def onMessage(self, payload, isBinary): self.sendMessage(payload, isBinary) def sendServerStatus(self, redirectUrl=None, redirectAfter=0): """ Used to send out server status/version upon receiving a HTTP/GET without upgrade to WebSocket header (and option serverStatus is True). """ try: page = self.factory._templates.get_template("cb_ws_testee_status.html") self.sendHtml( page.render( redirectUrl=redirectUrl, redirectAfter=redirectAfter, cbVersion=crossbar.__version__, wsUri=self.factory.url, ) ) except Exception as e: self.log.warn("Error rendering WebSocket status page template: {e}", e=e) class StreamingWebSocketTesteeServerProtocol(WebSocketServerProtocol): def onMessageBegin(self, isBinary): WebSocketServerProtocol.onMessageBegin(self, isBinary) self.beginMessage(isBinary=isBinary) def onMessageFrameBegin(self, length): WebSocketServerProtocol.onMessageFrameBegin(self, length) self.beginMessageFrame(length) def onMessageFrameData(self, data): self.sendMessageFrameData(data) def onMessageFrameEnd(self): pass def onMessageEnd(self): self.endMessage()
[docs] class WebSocketTesteeServerFactory(WebSocketServerFactory):
[docs] protocol = WebSocketTesteeServerProtocol
# FIXME: we currently don't use the streaming variant of the testee server protocol, # since it does not work together with WebSocket compression # protocol = StreamingWebSocketTesteeServerProtocol def __init__(self, config, templates): """ :param config: Crossbar transport configuration. :type config: dict """ options = config.get("options", {}) server = "Crossbar/{}".format(crossbar.__version__) externalPort = options.get("external_port", None) WebSocketServerFactory.__init__(self, url=config.get("url", None), server=server, externalPort=externalPort) # transport configuration
[docs] self._config = config
# Jinja2 templates for 404 etc
[docs] self._templates = templates
# set WebSocket options set_websocket_options(self, options)
class WebSocketTesteeController(WorkerController): """ A native Crossbar.io worker that runs a WebSocket testee. """ WORKER_TYPE = "websocket-testee" WORKER_TITLE = "WebSocket Testee" def __init__(self, config=None, reactor=None, personality=None): # base ctor WorkerController.__init__(self, config=config, reactor=reactor, personality=personality) @inlineCallbacks def onJoin(self, details): """ Called when worker process has joined the node's management realm. """ yield WorkerController.onJoin(self, details, publish_ready=False) # WorkerController.publish_ready() yield self.publish_ready() @wamp.register(None) def get_websocket_testee_transport(self, details=None): """ """ self.log.debug("{name}.get_websocket_testee_transport", name=self.__class__.__name__) @wamp.register(None) def start_websocket_testee_transport(self, id, config, details=None): """ """ self.log.debug("{name}.start_websocket_testee_transport", name=self.__class__.__name__) # prohibit starting a transport twice # # FIXME # if id in self.transports: # emsg = "Could not start transport: a transport with ID '{}' is already running (or starting)".format(id) # self.log.error(emsg) # raise ApplicationError('crossbar.error.already_running', emsg) # check configuration # try: self.personality.check_listening_transport_websocket(self.personality, config) except Exception as e: emsg = "Invalid WebSocket testee transport configuration: {}".format(e) self.log.error(emsg) raise ApplicationError("crossbar.error.invalid_configuration", emsg) else: self.log.debug("Starting {ttype}-transport on websocket-testee.", ttype=config["type"]) # WebSocket testee pseudo transport # if config["type"] == "websocket": transport_factory = WebSocketTesteeServerFactory(config, self._templates) # Unknown transport type # else: # should not arrive here, since we did check_transport() in the beginning raise Exception("logic error") # create transport endpoint / listening port from transport factory # d = create_listening_port_from_config( config["endpoint"], self.config.extra.cbdir, transport_factory, self._reactor, self.log ) def ok(port): # FIXME # self.transports[id] = RouterTransport(id, config, transport_factory, port) self.log.debug("Router transport '{tid}'' started and listening", tid=id) return def fail(err): emsg = "Cannot listen on transport endpoint: {}".format(err.value) self.log.error(emsg) raise ApplicationError("crossbar.error.cannot_listen", emsg) d.addCallbacks(ok, fail) return d @wamp.register(None) def stop_websocket_testee_transport(self, id, details=None): """ """ self.log.debug("{name}.stop_websocket_testee_transport", name=self.__class__.__name__) raise NotImplementedError()