Source code for crossbar.router.protocol

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import binascii
import os

# from twisted.protocols.tls import TLSMemoryBIOProtocol
from autobahn.twisted import rawsocket, websocket
from autobahn.util import hlid, hltype, hlval
from autobahn.wamp.types import TransportDetails
from autobahn.websocket.compress import PerMessageDeflateOffer, PerMessageDeflateOfferAccept
from autobahn.websocket.types import ConnectionDeny
from twisted import internet
from txaio import make_logger

import crossbar
from crossbar.common.twisted.endpoint import create_connecting_endpoint_from_config
from crossbar.router.cookiestore import CookieStoreDatabaseBacked, CookieStoreFileBacked, CookieStoreMemoryBacked

log = make_logger()

__all__ = (
    "WampWebSocketServerFactory",
    "WampRawSocketServerFactory",
    "WampWebSocketServerProtocol",
    "WampRawSocketServerProtocol",
    "WampWebSocketClientFactory",
    "WampRawSocketClientFactory",
    "WampWebSocketClientProtocol",
    "WampRawSocketClientProtocol",
)


def set_websocket_options(factory, options):
    """
    Set WebSocket options on a WebSocket or WAMP-WebSocket factory.

    :param factory: The WebSocket or WAMP-WebSocket factory to set options on.
    :type factory:  Instance of :class:`autobahn.twisted.websocket.WampWebSocketServerFactory`
                    or :class:`autobahn.twisted.websocket.WebSocketServerFactory`.
    :param options: Options from Crossbar.io transport configuration.
    :type options: dict
    """
    c = options

    # we need to pop() this, since it is not a WebSocket option to be consumed
    # by setProtocolOption(), but will get used in onConnect() ("STRICT_PROTOCOL_NEGOTIATION")
    #
    factory._requireWebSocketSubprotocol = c.pop("require_websocket_subprotocol", True)

    versions = []
    if c.get("enable_hybi10", True):
        versions.append(8)
    if c.get("enable_rfc6455", True):
        versions.append(13)

    # FIXME: enforce!!
    #
    # self.connectionCap = c.get("max_connections")

    # convert to seconds
    #
    openHandshakeTimeout = float(c.get("open_handshake_timeout", 0))
    if openHandshakeTimeout:
        openHandshakeTimeout = openHandshakeTimeout / 1000.0

    closeHandshakeTimeout = float(c.get("close_handshake_timeout", 0))
    if closeHandshakeTimeout:
        closeHandshakeTimeout = closeHandshakeTimeout / 1000.0

    autoPingInterval = None
    if "auto_ping_interval" in c:
        autoPingInterval = float(c["auto_ping_interval"]) / 1000.0

    autoPingTimeout = None
    if "auto_ping_timeout" in c:
        autoPingTimeout = float(c["auto_ping_timeout"]) / 1000.0

    # WebSocket compression
    #
    per_msg_compression = lambda _: None  # noqa
    if "compression" in c:
        # permessage-deflate
        #
        if "deflate" in c["compression"]:
            log.debug("enabling WebSocket compression (permessage-deflate)")

            params = c["compression"]["deflate"]

            request_no_context_takeover = params.get("request_no_context_takeover", False)
            request_max_window_bits = params.get("request_max_window_bits", 0)
            no_context_takeover = params.get("no_context_takeover", None)
            window_bits = params.get("max_window_bits", None)
            mem_level = params.get("memory_level", None)

            def accept(offers):
                for offer in offers:
                    if isinstance(offer, PerMessageDeflateOffer):
                        if (request_max_window_bits == 0 or offer.accept_max_window_bits) and (
                            not request_no_context_takeover or offer.accept_no_context_takeover
                        ):
                            return PerMessageDeflateOfferAccept(
                                offer,
                                request_max_window_bits=request_max_window_bits,
                                request_no_context_takeover=request_no_context_takeover,
                                no_context_takeover=no_context_takeover,
                                window_bits=window_bits,
                                mem_level=mem_level,
                            )

            per_msg_compression = accept

    if factory.isServer:
        factory.setProtocolOptions(
            versions=versions,
            webStatus=c.get("enable_webstatus", True),
            utf8validateIncoming=c.get("validate_utf8", True),
            maskServerFrames=c.get("mask_server_frames", False),
            requireMaskedClientFrames=c.get("require_masked_client_frames", True),
            applyMask=c.get("apply_mask", True),
            maxFramePayloadSize=c.get("max_frame_size", 0),
            maxMessagePayloadSize=c.get("max_message_size", 0),
            autoFragmentSize=c.get("auto_fragment_size", 0),
            failByDrop=c.get("fail_by_drop", False),
            echoCloseCodeReason=c.get("echo_close_codereason", False),
            openHandshakeTimeout=openHandshakeTimeout,
            closeHandshakeTimeout=closeHandshakeTimeout,
            tcpNoDelay=c.get("tcp_nodelay", True),
            autoPingInterval=autoPingInterval,
            autoPingTimeout=autoPingTimeout,
            autoPingSize=c.get("auto_ping_size", None),
            autoPingRestartOnAnyTraffic=c.get("auto_ping_restart_on_any_traffic", None),
            serveFlashSocketPolicy=c.get("enable_flash_policy", None),
            flashSocketPolicy=c.get("flash_policy", None),
            allowedOrigins=c.get("allowed_origins", ["*"]),
            allowNullOrigin=bool(c.get("allow_null_origin", True)),
            perMessageCompressionAccept=per_msg_compression,
        )
    else:
        factory.setProtocolOptions(
            version=None,
            utf8validateIncoming=c.get("validate_utf8", True),
            acceptMaskedServerFrames=c.get("accept_masked_server_frames", False),
            maskClientFrames=c.get("mask_client_frames", True),
            applyMask=c.get("apply_mask", True),
            maxFramePayloadSize=c.get("max_frame_size", 0),
            maxMessagePayloadSize=c.get("max_message_size", 0),
            autoFragmentSize=c.get("auto_fragment_size", 0),
            failByDrop=c.get("fail_by_drop", False),
            echoCloseCodeReason=c.get("echo_close_codereason", False),
            openHandshakeTimeout=openHandshakeTimeout,
            closeHandshakeTimeout=closeHandshakeTimeout,
            tcpNoDelay=c.get("tcp_nodelay", True),
            autoPingInterval=autoPingInterval,
            autoPingTimeout=autoPingTimeout,
            autoPingSize=c.get("auto_ping_size", None),
            autoPingRestartOnAnyTraffic=c.get("auto_ping_restart_on_any_traffic", None),
            perMessageCompressionOffers=None,
            perMessageCompressionAccept=None,
        )


[docs] class WampWebSocketServerProtocol(websocket.WampWebSocketServerProtocol): """ Crossbar.io WAMP-over-WebSocket server protocol. """
[docs] log = make_logger()
def __init__(self): super(WampWebSocketServerProtocol, self).__init__()
[docs] self._cbtid = None
[docs] def onConnect(self, request): self.log.debug("{func}(request={request})", func=hltype(self.onConnect), request=request) if self.factory.debug_traffic: from twisted.internet import reactor def print_traffic(): self.log.info( "Traffic {peer}: {wire_in} / {wire_out} in / out bytes - {ws_in} / {ws_out} in / out msgs", peer=self.peer, wire_in=self.trafficStats.incomingOctetsWireLevel, wire_out=self.trafficStats.outgoingOctetsWireLevel, ws_in=self.trafficStats.incomingWebSocketMessages, ws_out=self.trafficStats.outgoingWebSocketMessages, ) reactor.callLater(1, print_traffic) print_traffic() # if WebSocket client did not set WS subprotocol, assume "wamp.2.json" # self.STRICT_PROTOCOL_NEGOTIATION = self.factory._requireWebSocketSubprotocol # handle WebSocket opening handshake # protocol, headers = websocket.WampWebSocketServerProtocol.onConnect(self, request) self.log.debug( '{func}: proceed with WebSocket opening handshake for WebSocket subprotocol "{protocol}"', func=hltype(self.onConnect), protocol=hlval(protocol), ) try: self._origin = request.origin # transport-level WMAP authentication info # self._authid = None self._authrole = None self._authrealm = None self._authmethod = None self._authextra = None self._authprovider = None # cookie tracking and cookie-based authentication # self._cbtid = None if self.factory._cookiestore: # try to parse an already set cookie from HTTP request headers self._cbtid = self.factory._cookiestore.parse(request.headers) if self._cbtid: self.log.info( '{func}: parsed tracking/authentication cookie cbtid "{cbtid}" from HTTP request headers', func=hltype(self.onConnect), cbtid=hlval(self._cbtid), ) else: self.log.info( "{func}: no tracking/authentication cookie cbtid found in HTTP request headers!", func=hltype(self.onConnect), ) # if no cookie is set, or it doesn't exist in our database, create a new cookie if self._cbtid is None or not self.factory._cookiestore.exists(self._cbtid): self._cbtid, headers["Set-Cookie"] = self.factory._cookiestore.create() if "cookie" in self.factory._config: if ( "secure" in self.factory._config["cookie"] and self.factory._config["cookie"]["secure"] is True ): headers["Set-Cookie"] += ";Secure" if ( "http_strict" in self.factory._config["cookie"] and self.factory._config["cookie"]["http_strict"] is True ): headers["Set-Cookie"] += ";HttpOnly" if "same_site" in self.factory._config["cookie"]: headers["Set-Cookie"] += ";SameSite=" + self.factory._config["cookie"]["same_site"] self.log.info( "{func}: setting new cookie {cookie}", func=hltype(self.onConnect), cookie=hlval(headers["Set-Cookie"], color="yellow"), ) else: self.log.info( '{func}: tracking/authentication cookie cbtid "{cbtid}" already set and stored', func=hltype(self.onConnect), cbtid=hlval(self._cbtid), ) # add this WebSocket connection to the set of connections # associated with the same cookie self.factory._cookiestore.addProto(self._cbtid, self) self.log.debug("Cookie tracking enabled on WebSocket connection {ws}", ws=self) # if cookie-based authentication is enabled, set auth info from cookie store # if "auth" in self.factory._config and "cookie" in self.factory._config["auth"]: self._authid, self._authrole, self._authmethod, self._authrealm, self._authextra = ( self.factory._cookiestore.getAuth(self._cbtid) ) if self._authid: # there is a cookie set, and the cookie was previously successfully authenticated, # so immediately authenticate the client using that information self._authprovider = "cookie" self.log.info( '{func} authenticated client via cookie {cookiename}={cbtid} as authid="{authid}", authrole="{authrole}", authmethod="{authmethod}", authprovider="{authprovider}", authrealm="{authrealm}"', func=hltype(self.onConnect), cookiename=self.factory._cookiestore._cookie_id_field, cbtid=hlval(self._cbtid, color="green"), authid=hlid(self._authid), authrole=hlid(self._authrole), authmethod=hlval(self._authmethod), authprovider=hlval(self._authprovider), authrealm=hlid(self._authrealm), ) else: # there is a cookie set, but the cookie wasn't authenticated yet using a different auth method self.log.info( "{func} cookie-based authentication enabled, but cookie {cbtid} is not authenticated yet", cbtid=hlval(self._cbtid, color="blue"), func=hltype(self.onConnect), ) else: self.log.info( "{func} cookie-based authentication disabled on connection", func=hltype(self.onConnect) ) else: self.log.info("{func} cookie tracking disabled on WebSocket connection", func=hltype(self.onConnect)) # negotiated WebSocket subprotocol in use, e.g. "wamp.2.cbor.batched" self._transport_details.websocket_protocol = protocol # WebSocket extensions in use. will be filled in onOpen(), see below self._transport_details.websocket_extensions_in_use = None # Crossbar.io tracking ID (for cookie tracking) self._transport_details.http_cbtid = self._cbtid # all HTTP headers as received by the WebSocket client self._transport_details.http_headers_received = request.headers # only customer user headers (such as cookie) self._transport_details.http_headers_sent = headers # accept the WebSocket connection, speaking subprotocol `protocol` # and setting HTTP headers `headers` return protocol, headers except: self.log.failure()
[docs] def onOpen(self): # note the WebSocket extensions negotiated self._transport_details.websocket_extensions_in_use = [e.__json__() for e in self.websocket_extensions_in_use] return super(WampWebSocketServerProtocol, self).onOpen()
[docs] def sendServerStatus(self, redirectUrl=None, redirectAfter=0): """ Used to send out server status/version upon receiving a HTTP/GET without upgrade to WebSocket header (and option serverStatus is True). """ try: page = self.factory._templates.get_template("cb_ws_status.html") self.sendHtml( page.render( redirectUrl=redirectUrl, redirectAfter=redirectAfter, cbVersion=crossbar.__version__, wsUri=self.factory.url, peer=self.peer, workerPid=os.getpid(), ) ) except Exception: self.log.failure("Error rendering WebSocket status page template: {log_failure.value}")
[docs] def onClose(self, wasClean, code, reason): super(WampWebSocketServerProtocol, self).onClose(wasClean, code, reason) # remove this WebSocket connection from the set of connections # associated with the same cookie if self._cbtid: self.factory._cookiestore.dropProto(self._cbtid, self)
[docs] class WampWebSocketServerFactory(websocket.WampWebSocketServerFactory): """ Crossbar.io WAMP-over-WebSocket server factory. """
[docs] showServerVersion = False
[docs] protocol = WampWebSocketServerProtocol
[docs] log = make_logger()
def __init__(self, factory, cbdir, config, templates): """ :param factory: WAMP session factory. :type factory: An instance of .. :param cbdir: The Crossbar.io node directory. :type cbdir: str :param config: Crossbar transport configuration. :type config: dict :param templates: :type templates: """
[docs] self.debug_traffic = config.get("debug_traffic", False)
options = config.get("options", {}) # announce Crossbar.io server version # self.showServerVersion = options.get("show_server_version", self.showServerVersion) if self.showServerVersion: server = "Crossbar/{}".format(crossbar.__version__) else: # do not disclose crossbar version server = "Crossbar" # external (public) listening port (eg when running behind a reverse proxy) # externalPort = options.get("external_port", None) # explicit list of WAMP serializers # if "serializers" in config: serializers = [] sers = set(config["serializers"]) if "flatbuffers" in sers: # try FlatBuffers WAMP serializer try: from autobahn.wamp.serializer import FlatBuffersSerializer serializers.append(FlatBuffersSerializer(batched=True)) serializers.append(FlatBuffersSerializer()) except ImportError("FlatBuffersSerializer"): self.log.warn("Warning: could not load WAMP-FlatBuffers serializer") else: sers.discard("flatbuffers") if "cbor" in sers: # try CBOR WAMP serializer try: from autobahn.wamp.serializer import CBORSerializer serializers.append(CBORSerializer(batched=True)) serializers.append(CBORSerializer()) except ImportError("CBORSerializer"): self.log.warn("Warning: could not load WAMP-CBOR serializer") else: sers.discard("cbor") if "msgpack" in sers: # try MsgPack WAMP serializer try: from autobahn.wamp.serializer import MsgPackSerializer serializers.append(MsgPackSerializer(batched=True)) serializers.append(MsgPackSerializer()) except ImportError("MsgPackSerializer"): self.log.warn("Warning: could not load WAMP-MsgPack serializer") else: sers.discard("msgpack") if "ubjson" in sers: # try UBJSON WAMP serializer try: from autobahn.wamp.serializer import UBJSONSerializer serializers.append(UBJSONSerializer(batched=True)) serializers.append(UBJSONSerializer()) except ImportError("UBJSONSerializer"): self.log.warn("Warning: could not load WAMP-UBJSON serializer") else: sers.discard("ubjson") if "json" in sers: # try JSON WAMP serializer try: from autobahn.wamp.serializer import JsonSerializer serializers.append(JsonSerializer(batched=True)) serializers.append(JsonSerializer()) except ImportError("JsonSerializer"): self.log.warn("Warning: could not load WAMP-JSON serializer") else: sers.discard("json") if not serializers: raise Exception("no valid WAMP serializers specified") if len(sers) > 0: raise Exception("invalid WAMP serializers specified (the following were unprocessed) {}".format(sers)) else: serializers = None websocket.WampWebSocketServerFactory.__init__( self, factory, serializers=serializers, url=config.get("url", None), server=server, externalPort=externalPort, ) # Crossbar.io node directory
[docs] self._cbdir = cbdir
# transport configuration
[docs] self._config = config
# Jinja2 templates for 404 etc
[docs] self._templates = templates
# enable cookie tracking if a cookie store is configured if "cookie" in config: # cookie store configuration item cookie_config = config["cookie"] # cookie store cookie_store_config = cookie_config["store"] cookie_store_type = cookie_store_config["type"] # setup ephemeral, memory-backed cookie store if cookie_store_type == "memory": self._cookiestore = CookieStoreMemoryBacked(cookie_config) self.log.info("Memory-backed cookie store active.") # setup persistent, file-backed cookie store elif cookie_store_type == "file": cookie_store_file = os.path.abspath(os.path.join(self._cbdir, cookie_store_config["filename"])) self._cookiestore = CookieStoreFileBacked(cookie_store_file, cookie_config) self.log.info( "File-backed cookie store active {cookie_store_file}", cookie_store_file=hlval(cookie_store_file) ) # setup persistent, database-backed cookie store elif cookie_store_type == "database": cookie_dbpath = os.path.abspath(os.path.join(self._cbdir, cookie_store_config["path"])) self._cookiestore = CookieStoreDatabaseBacked(cookie_dbpath, cookie_config) self.log.info( "Database-backed cookie store active! [cookiestore={cookiestore}]", cookiestore=hltype(CookieStoreDatabaseBacked), ) else: # should not arrive here as the config should have been checked before raise NotImplementedError( '{}: implementation of cookiestore of type "{}" missing'.format( self.__class__.__name__, cookie_store_type ) ) else: # this disables cookie tracking (both with or without WAMP-cookie authentication) self._cookiestore = None # set WebSocket options set_websocket_options(self, options)
def set_rawsocket_options(factory, options): """ Set RawSocket options on a RawSocket or WAMP-RawSocket (server or client )factory. :param factory: The RawSocket or WAMP-RawSocket factory to set options on. :type factory: Instance of :class:`autobahn.twisted.rawsocket.WampRawSocketServerFactory` or :class:`autobahn.twisted.rawsocket.WampRawSocketClientFactory`. :param options: RawSocket transport options item from Crossbar.io transport configuration. :type options: dict """ c = options factory.setProtocolOptions(maxMessagePayloadSize=c.get("max_message_size", None))
[docs] class WampRawSocketServerProtocol(rawsocket.WampRawSocketServerProtocol): """ Crossbar.io WAMP-over-RawSocket server protocol. """
[docs] log = make_logger()
[docs] def connectionMade(self): rawsocket.WampRawSocketServerProtocol.connectionMade(self) # transport authentication # self._authid = None self._authrole = None self._authrealm = None self._authmethod = None self._authprovider = None self._authextra = None # cookie tracking ID # self._cbtid = None
[docs] def _on_handshake_complete(self): self._transport_details.channel_serializer = TransportDetails.CHANNEL_SERIALIZER_FROM_STR[ self._serializer.SERIALIZER_ID ] self._transport_details.websocket_protocol = "wamp.2.{}".format(self._serializer.SERIALIZER_ID) return rawsocket.WampRawSocketServerProtocol._on_handshake_complete(self)
[docs] class WampRawSocketServerFactory(rawsocket.WampRawSocketServerFactory): """ Crossbar.io WAMP-over-RawSocket server factory. """
[docs] protocol = WampRawSocketServerProtocol
[docs] log = make_logger()
def __init__(self, factory, config): # remember transport configuration #
[docs] self._config = config
# explicit list of WAMP serializers # if "serializers" in config: serializers = [] sers = set(config["serializers"]) if "flatbuffers" in sers: # try FlatBuffers WAMP serializer try: from autobahn.wamp.serializer import FlatBuffersSerializer serializers.append(FlatBuffersSerializer()) except ImportError: self.log.warn("Warning: could not load WAMP-FlatBuffers serializer") else: sers.discard("flatbuffers") if "cbor" in sers: # try CBOR WAMP serializer try: from autobahn.wamp.serializer import CBORSerializer serializers.append(CBORSerializer()) except ImportError: self.log.warn("Warning: could not load WAMP-CBOR serializer") else: sers.discard("cbor") if "msgpack" in sers: # try MsgPack WAMP serializer try: from autobahn.wamp.serializer import MsgPackSerializer serializer = MsgPackSerializer() serializer._serializer.ENABLE_V5 = False # FIXME serializers.append(serializer) except ImportError: self.log.warn("Warning: could not load WAMP-MsgPack serializer") else: sers.discard("msgpack") if "ubjson" in sers: # try UBJSON WAMP serializer try: from autobahn.wamp.serializer import UBJSONSerializer serializers.append(UBJSONSerializer(batched=True)) serializers.append(UBJSONSerializer()) except ImportError: self.log.warn("Warning: could not load WAMP-UBJSON serializer") else: sers.discard("ubjson") if "json" in sers: # try JSON WAMP serializer try: from autobahn.wamp.serializer import JsonSerializer serializers.append(JsonSerializer()) except ImportError: self.log.warn("Warning: could not load WAMP-JSON serializer") else: sers.discard("json") if not serializers: raise Exception("no valid WAMP serializers specified") if len(sers) > 0: raise Exception("invalid WAMP serializers specified (the following were unprocessed) {}".format(sers)) else: serializers = None rawsocket.WampRawSocketServerFactory.__init__(self, factory, serializers) if "options" in config: set_rawsocket_options(self, config["options"]) self.log.debug( "RawSocket transport factory created using {serializers} serializers, max. message size {maxsize}", serializers=serializers, maxsize=self._max_message_size, )
[docs] class WampWebSocketClientProtocol(websocket.WampWebSocketClientProtocol): """ Crossbar.io WAMP-over-WebSocket client protocol. """
[docs] class WampWebSocketClientFactory(websocket.WampWebSocketClientFactory): """ Crossbar.io WAMP-over-WebSocket client factory. """
[docs] protocol = WampWebSocketClientProtocol
[docs] def buildProtocol(self, addr): self._proto = websocket.WampWebSocketClientFactory.buildProtocol(self, addr) return self._proto
[docs] class WampRawSocketClientProtocol(rawsocket.WampRawSocketClientProtocol): """ Crossbar.io WAMP-over-RawSocket client protocol. """
[docs] class WampRawSocketClientFactory(rawsocket.WampRawSocketClientFactory): """ Crossbar.io WAMP-over-RawSocket client factory. """
[docs] protocol = WampRawSocketClientProtocol
def __init__(self, factory, config): # transport configuration
[docs] self._config = config
# WAMP serializer # serid = config.get("serializer", "json") if serid == "json": # try JSON WAMP serializer try: from autobahn.wamp.serializer import JsonSerializer serializer = JsonSerializer() except ImportError: raise Exception("could not load WAMP-JSON serializer") elif serid == "msgpack": # try MessagePack WAMP serializer try: from autobahn.wamp.serializer import MsgPackSerializer serializer = MsgPackSerializer() serializer._serializer.ENABLE_V5 = False # FIXME except ImportError: raise Exception("could not load WAMP-MessagePack serializer") elif serid == "cbor": # try CBOR WAMP serializer try: from autobahn.wamp.serializer import CBORSerializer serializer = CBORSerializer() except ImportError: raise Exception("could not load WAMP-CBOR serializer") else: raise Exception("invalid WAMP serializer '{}'".format(serid)) rawsocket.WampRawSocketClientFactory.__init__(self, factory, serializer)
class WebSocketReverseProxyClientProtocol(websocket.WebSocketClientProtocol): log = make_logger() def onConnect(self, response): self.log.debug("WebSocketReverseProxyClientProtocol.onConnect(response={response})", response=response) # response={"peer": "tcp4:127.0.0.1:9000", "headers": {"server": "AutobahnPython/17.10.1", "upgrade": "WebSocket", "connection": "Upgrade", "sec-websocket-accept": "tJFVMTSzGypbxQb8GW1dK/QLMZQ="}, "version": 18, "protocol": null, "extensions": []} try: # accept = ConnectionAccept(subprotocol=response.protocol, headers=None) headers = {} accept = response.protocol, headers self.backend_on_connect.callback(accept) except: self.log.failure() def onOpen(self): self.log.debug("WebSocketReverseProxyClientProtocol.onOpen()") self.factory.frontend_protocol.onOpen() def onMessage(self, payload, isBinary): self.log.debug( "WebSocketReverseProxyClientProtocol.onMessage(payload={payload}, isBinary={isBinary})", payload="{}..".format((binascii.b2a_hex(payload).decode() if isBinary else payload.decode("utf8"))[:16]), isBinary=isBinary, ) self.factory.frontend_protocol.sendMessage(payload, isBinary) def onClose(self, wasClean, code, reason): self.log.debug( "WebSocketReverseProxyClientProtocol.onClose(wasClean={wasClean}, code={code}, reason={reason})", wasClean=wasClean, code=code, reason=reason, ) code = 1000 self.factory.frontend_protocol.sendClose(code, reason) class WebSocketReverseProxyClientFactory(websocket.WebSocketClientFactory): log = make_logger() protocol = WebSocketReverseProxyClientProtocol def __init__(self, *args, **kwargs): self.frontend_protocol = kwargs.pop("frontend_protocol", None) assert self.frontend_protocol is not None frontend_request = kwargs.pop("frontend_request", None) if frontend_request: protocols = frontend_request.protocols # headers = frontend_request.headers else: protocols = None # headers = None websocket.WebSocketClientFactory.__init__(self, *args, protocols=protocols, **kwargs) # set WebSocket options self.backend_config = self.frontend_protocol.backend_config set_websocket_options(self, self.backend_config.get("options", {})) class WebSocketReverseProxyServerProtocol(websocket.WebSocketServerProtocol): """ Server protocol to accept incoming (frontend) WebSocket connections and forward traffic to a backend WebSocket server. This protocol supports any WebSocket based subprotocol with text or binary payload. The WebSocket connection to the backend WebSocket server is configurable on the factory for this protocol. """ log = make_logger() def onConnect(self, request): """ Incoming (frontend) WebSocket connection accepted. Forward connect to backend WebSocket server. """ self.log.debug("WebSocketReverseProxyServerProtocol.onConnect(request={request})", request=request) self.backend_config = self.factory.path_config["backend"] self.backend_factory = WebSocketReverseProxyClientFactory( frontend_protocol=self, frontend_request=request, url=self.backend_config.get("url", None) ) self.backend_factory.noisy = False self.backend_protocol = None # create and connect client endpoint # endpoint = create_connecting_endpoint_from_config( self.backend_config["endpoint"], None, self.factory.reactor, self.log ) backend_on_connect = internet.defer.Deferred() # now, actually connect the client # d = endpoint.connect(self.backend_factory) def on_connect_success(proto): self.log.debug("WebSocketReverseProxyServerProtocol.onConnect(..): connected") proto.backend_on_connect = backend_on_connect self.backend_protocol = proto def on_connect_error(err): deny = ConnectionDeny(ConnectionDeny.SERVICE_UNAVAILABLE, "WebSocket reverse proxy backend not reachable") backend_on_connect.errback(deny) d.addCallbacks(on_connect_success, on_connect_error) return backend_on_connect def onOpen(self): self.log.debug("WebSocketReverseProxyServerProtocol.onOpen()") def onMessage(self, payload, isBinary): if self.backend_protocol: self.log.debug( "WebSocketReverseProxyServerProtocol: forwarding WebSocket message from frontend connection to backend connection" ) self.backend_protocol.sendMessage(payload, isBinary) else: self.log.warn( "WebSocketReverseProxyServerProtocol: received WebSocket message on frontend connection while there is no backend connection! dropping WebSocket message" ) def onClose(self, wasClean, code, reason): if self.backend_protocol: self.log.debug( "WebSocketReverseProxyServerProtocol: forwarding close from frontend connection to backend connection" ) code = 1000 self.backend_protocol.sendClose(code, reason) else: self.log.warn( "WebSocketReverseProxyServerProtocol: received WebSocket close on frontend connection while there is no backend connection! dropping WebSocket close" ) class WebSocketReverseProxyServerFactory(websocket.WebSocketServerFactory): """ Reverse WebSocket proxy factory. This factory produces protocols that accept incoming WebSocket connections, connect to a backend WebSocket server and forward WebSocket messages received from both side to the other side. Reverse WebSocket proxy factories are then given to WebSocketResource instances in a Web transport resource tree. """ protocol = WebSocketReverseProxyServerProtocol log = make_logger() def __init__(self, reactor, path_config): """ :param path_config: The path configuration of the Web transport resource. :type path_config: dict """ self.reactor = reactor self.path_config = path_config url = path_config.get("url", None) options = path_config.get("options", {}) showServerVersion = options.get("show_server_version", True) if showServerVersion: server = "Crossbar/{}".format(crossbar.__version__) else: server = "Crossbar" externalPort = options.get("external_port", None) protocols = None headers = None websocket.WebSocketServerFactory.__init__( self, reactor=reactor, url=url, protocols=protocols, server=server, headers=headers, externalPort=externalPort, ) # set WebSocket options set_websocket_options(self, options)