Source code for crossbar.worker.proxy

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import os
from pprint import pformat
from typing import Any, Dict, Optional, Set, Tuple

from autobahn import util, wamp
from autobahn.twisted.component import Component, _create_transport_endpoint, _create_transport_factory
from autobahn.twisted.wamp import ApplicationSession, Session
from autobahn.wamp import cryptosign, message, types
from autobahn.wamp.auth import create_authenticator
from autobahn.wamp.component import _create_transport
from autobahn.wamp.exception import ApplicationError, ProtocolError, TransportLost
from autobahn.wamp.interfaces import IMessage, ITransportHandler
from autobahn.wamp.role import RoleBrokerFeatures, RoleDealerFeatures
from twisted.internet.base import ReactorBase
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.internet.error import DNSLookupError
from txaio import as_future, make_logger, time_ns

from crossbar._util import hlid, hltype, hlval
from crossbar.common.key import _read_node_key
from crossbar.interfaces import IRealmContainer
from crossbar.node import worker
from crossbar.router.auth import AUTHMETHOD_MAP, PendingAuthScram, PendingAuthTicket, PendingAuthWampCra
from crossbar.router.session import RouterFactory, RouterSessionFactory
from crossbar.worker.controller import WorkerController
from crossbar.worker.transport import TransportController

try:
    from crossbar.router.auth import PendingAuthCryptosign, PendingAuthCryptosignProxy
except ImportError:
    PendingAuthCryptosign = None  # type: ignore
    PendingAuthCryptosignProxy = None  # type: ignore

__all__ = (
    "ProxyWorkerProcess",
    "ProxyController",
    "ProxyConnection",
    "ProxyRoute",
)

log = make_logger()


[docs] class ProxyWorkerProcess(worker.NativeWorkerProcess):
[docs] TYPE = "proxy"
[docs] LOGNAME = "Proxy"
class ProxyFrontendSession(object): """ A router-side proxy session that handles incoming client connections. """ # Note: "roles" come from self._router.attach() in non-proxy code ROLES = { "broker": RoleBrokerFeatures( publisher_identification=True, pattern_based_subscription=True, session_meta_api=True, subscription_meta_api=True, subscriber_blackwhite_listing=True, publisher_exclusion=True, subscription_revocation=True, event_retention=True, payload_transparency=True, payload_encryption_cryptobox=True, ), "dealer": RoleDealerFeatures( caller_identification=True, pattern_based_registration=True, session_meta_api=True, registration_meta_api=True, shared_registration=True, progressive_call_results=True, registration_revocation=True, payload_transparency=True, testament_meta_api=True, payload_encryption_cryptobox=True, call_canceling=True, ), } log = make_logger() def __init__(self, router_factory): self._router_factory = router_factory self._controller = router_factory.worker self._reset() def _reset(self): # after the frontend connection is open, this will be the frontend transport self.transport = None # if we have a backend connection, it'll be here (and be a # Session instance) self._backend_session = None # pending session id (before the session is fully joined) self._pending_session_id = None # authenticated+joined session information self._session_id = None self._authid = None self._realm = None self._authid = None self._authrole = None self._authmethod = None self._authprovider = None self._authextra = None self._custom_authextra = {} @property def realm(self): return self._realm @property def authid(self): return self._authid @property def authrole(self): return self._authrole @property def authmethod(self): return self._authmethod @property def authprovider(self): return self._authprovider @property def authextra(self): return self._authextra def onOpen(self, transport): """ Callback fired when transport is open. May run asynchronously. The transport is considered running and is_open() would return true, as soon as this callback has completed successfully. :param transport: The WAMP transport. :type transport: object implementing :class:`autobahn.wamp.interfaces.ITransport` """ self.log.debug("{func}(transport={transport})", func=hltype(self.onOpen), transport=transport) self.transport = transport # transport configuration if hasattr(self.transport, "factory") and hasattr(self.transport.factory, "_config"): self._transport_config = self.transport.factory._config else: self._transport_config = {} self._custom_authextra = { "x_cb_proxy_node": self._router_factory._node_id, "x_cb_proxy_worker": self._router_factory._worker_id, "x_cb_proxy_peer": str(self.transport.peer), "x_cb_proxy_pid": os.getpid(), } self.log.info( "{func} proxy frontend session connected from peer {peer}", func=hltype(self.onOpen), peer=hlval(self.transport.transport_details.peer) if self.transport.transport_details else None, ) def onClose(self, wasClean): """ Callback fired when the transport has been closed. :param wasClean: Indicates if the transport has been closed regularly. :type wasClean: bool """ self.log.info( "{func} proxy frontend session {sessionId} closed (wasClean={wasClean})", func=hltype(self.onClose), sessionId=hlid(self._session_id), wasClean=wasClean, ) # actually, at this point, the backend session should already be gone, but better check! if self._backend_session: self._controller.unmap_backend(self, self._backend_session) self._backend_session = None # reset everything (even though this frontend protocol instance should not be reused anyways) self._reset() @inlineCallbacks def onMessage(self, msg): """ Callback fired when a WAMP message was received. May run asynchronously. The callback should return or fire the returned deferred/future when it's done processing the message. In particular, an implementation of this callback must not access the message afterwards. :param msg: The WAMP message received. :type msg: object implementing :class:`autobahn.wamp.interfaces.IMessage` """ self.log.debug("{func} proxy frontend session onMessage(msg={msg})", func=hltype(self.onMessage), msg=msg) if self._session_id is None: # no frontend session established yet, so we expect one of HELLO, ABORT, AUTHENTICATE # https://wamp-proto.org/_static/gen/wamp_latest.html#session-establishment if isinstance(msg, message.Hello): yield self._process_Hello(msg) # https://wamp-proto.org/_static/gen/wamp_latest.html#session-closing elif isinstance(msg, message.Abort): self.transport.send( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message="Proxy authentication failed") ) # https://wamp-proto.org/_static/gen/wamp_latest.html#wamp-level-authentication elif isinstance(msg, message.Authenticate): self._process_Authenticate(msg) else: raise ProtocolError( "Received {} message while proxy frontend session is not joined".format(msg.__class__.__name__) ) else: # frontend session is established: process WAMP message if ( isinstance(msg, message.Hello) or isinstance(msg, message.Abort) or isinstance(msg, message.Authenticate) ): raise ProtocolError( "Received {} message while proxy frontend session is already joined".format(msg.__class__.__name__) ) # https://wamp-proto.org/_static/gen/wamp_latest.html#session-closing elif isinstance(msg, message.Goodbye): # compare this code here for proxies to :meth:`RouterSession.onLeave` for routers # 1) if asked to explicitly close the session if msg.reason == "wamp.close.logout": cookie_deleted = None cnt_kicked = 0 # if cookie was set on transport if ( self.transport and hasattr(self.transport, "_cbtid") and self.transport._cbtid and self.transport.factory._cookiestore ): cbtid = self.transport._cbtid cs = self.transport.factory._cookiestore # set cookie to "not authenticated" # cs.setAuth(cbtid, None, None, None, None, None) cs.delAuth(cbtid) cookie_deleted = cbtid # kick all transport protos (e.g. WampWebSocketServerProtocol) for the same auth cookie for proto in cs.getProtos(cbtid): # but don't kick ourselves if proto != self.transport: proto.sendClose() cnt_kicked += 1 self.log.info( "{func} {action} completed for session {session_id} (cookie authentication deleted: " '"{cookie_deleted}", pro-actively kicked (other) sessions: {cnt_kicked})', action=hlval("wamp.close.logout", color="red"), session_id=hlid(self._session_id), cookie_deleted=hlval(cookie_deleted, color="red") if cookie_deleted else "none", cnt_kicked=hlval(cnt_kicked, color="red") if cnt_kicked else "none", func=hltype(self.onMessage), ) # 2) if we currently have a session from proxy to backend router (as normally the case), # disconnect and unmap that session as well if self._backend_session: self._controller.unmap_backend( self, self._backend_session, leave_reason=msg.reason, leave_message=msg.message ) self._backend_session = None else: self.log.warn( "{func} frontend session left, but no active backend session to close!", func=hltype(self.onMessage), ) # 3) complete the closing handshake (initiated by the client in this case) by replying with GOODBYE self.transport.send(message.Goodbye(message="Proxy session closing")) else: if self._backend_session is None or self._backend_session._transport is None: raise TransportLost( "Expected to relay {} message, but proxy backend session or transport is gone".format( msg.__class__.__name__, ) ) else: # if we have an active backend connection, forward the WAMP message self._backend_session._transport.send(msg) def frontend_accepted(self, accept): # we have done authentication with the client; now we can connect to # the backend (and we wait to tell the client they're # welcome until we have actually connected to the # backend). self.log.info( "{func} proxy frontend session accepted ({accept})", func=hltype(self.frontend_accepted), accept=accept ) if ( hasattr(self.transport, "_cbtid") and hasattr(self.transport.factory, "_cookiestore") and self.transport.factory._cookiestore ): self.transport.factory._cookiestore.setAuth( self.transport._cbtid, accept.authid, accept.authrole, accept.authmethod, accept.authextra, accept.realm, ) result = Deferred() @inlineCallbacks def backend_connected(backend: ProxyBackendSession): # bytestream-level transport to backend router worker connected. try: # first, wait for the WAMP-level transport to connect before starting to join yield backend._on_connect # while we were yielding, frontend session might have been closed (transport disconnected) if self.transport is None: backend.disconnect() raise TransportLost("Proxy frontend session disconnected while connecting to backend") # node private key key = _read_node_key(self._controller._cbdir, private=False) # authid of the connecting backend (proxy service) session is this proxy node's ID backend_authid = self._controller.node_id # authmethods we announce to the backend router we connect to authmethods = list(backend._authenticators.keys()) # authentication extra we transmit from the proxy to backend router worker authextra = { # for WAMP-cryptosign authentication of the proxy frontend # to the backend router "pubkey": key["hex"], # forward authentication credentials of the connecting client # # the following are the effective (realm, authid, authrole) under # which the client (proxy frontend connection) was successfully # authenticated (using the authmethod+authprovider) "proxy_realm": accept.realm, "proxy_authid": accept.authid, "proxy_authrole": accept.authrole, "proxy_authmethod": accept.authmethod, "proxy_authprovider": accept.authprovider, # this is the authextra returned from the proxy frontend authenticator "proxy_authextra": accept.authextra or {}, } # get marshalled transport details for this proxy frontend session if self.transport.transport_details: # IMPORTANT: this attribute "transport" is in addition to _other_ attributes that # might be already present from "accept.authextra". assert "transport" not in authextra["proxy_authextra"] # these are the transport details from the proxy frontend session. this is picked # up in PendingAuthAnonymousProxy.hello() and PendingAuthCryptosignProxy.hello() authextra["proxy_authextra"]["transport"] = self.transport.transport_details.marshal() self.log.info( "{func} proxy backend session authenticating with authmethods={authmethods}, pubkey={pubkey}: " 'proxy_authid="{proxy_authid}", proxy_authrole="{proxy_authrole}", proxy_realm="{proxy_realm}"', func=hltype(backend_connected), authmethods=hlval(authmethods), pubkey=hlval(authextra["pubkey"]), proxy_authid=hlid(authextra["proxy_authid"]), proxy_authrole=hlid(authextra["proxy_authrole"]), proxy_realm=hlid(authextra["proxy_realm"]), ) # now join WAMP session, which might first start WAMP authentication (for authmethod "cryptosign-proxy") backend.join(accept.realm, authmethods=authmethods, authid=backend_authid, authextra=authextra) def backend_joined(session, details): self.log.debug( "{func} proxy backend session joined (backend_session_id={backend_session_id})", backend_session_id=hlid(details.session), backend_session=session, pending_session_id=self._pending_session_id, func=hltype(backend_joined), ) # we're ready now! store and return the backend session self._backend_session = session # we set the frontend session ID to that of the backend session mapped for our frontend session .. self._session_id = details.session # .. NOT our (fake) pending session ID (generated in the proxy worker) # self._session_id = self._pending_session_id # credentials of the backend session mapped for our frontend session self._realm = details.realm self._authid = details.authid self._authrole = details.authrole # this is the authextra returned for the backend session mapped for our frontend session self._authextra = details.authextra # authentication method & provider are the requested (and succeeding) ones self._authmethod = accept.authmethod self._authprovider = accept.authprovider result.callback(session) backend.on("join", backend_joined) yield backend._on_ready return backend except Exception as e: self.log.failure() result.errback(e) def backend_failed(fail): result.errback(fail) # map and connect bytestream-level transport to backend router worker backend_d = self._controller.map_backend( self, accept.realm, accept.authid, accept.authrole, accept.authextra, ) backend_d.addCallback(backend_connected) backend_d.addErrback(backend_failed) return result def _send_if_transport_is_alive(self, msg): # this is a debugging helper # We received a message on the backend connection (auth, welcome) and need to send it to the client # this is semantically the same as _forward, and it solve the same issues # We don't send here, but we also do not cause an exception if the transport is gone if self.transport: self.transport.send(msg) else: self.log.debug("Trying to send a message to the client, but no frontend transport! [{msg}]", msg=msg) def _forward(self, msg): # we received a message on the backend connection: forward to client over frontend connection if self.transport: self.transport.send(msg) else: # FIXME: can we improve this? # eg when the frontend client connection has closed before we had a chance to stop the backend and we still # receive eg a Result(request=23033, args=[] ..) from the backend. self.log.debug("Trying to forward a message to the client, but no frontend transport! [{msg}]", msg=msg) @inlineCallbacks def _process_Hello(self, msg): """ We have received a Hello from the frontend client. Now we do any authentication necessary with them and connect to our backend. """ self.log.debug( "{func} proxy frontend session processing HELLO (msg={msg})", func=hltype(self._process_Hello), msg=msg ) self._pending_session_id = util.id() self._goodbye_sent = False realm = msg.realm # allow "Personality" classes to add authmethods extra_auth_methods = self._controller.personality.EXTRA_AUTH_METHODS authmethods = list(extra_auth_methods.keys()) + (msg.authmethods or ["anonymous"]) # if the client had a reassigned realm during authentication, restore it from the cookie if hasattr(self.transport, "_authrealm") and self.transport._authrealm: if "cookie" in authmethods: realm = self.transport._authrealm # noqa authextra = self.transport._authextra # noqa elif self.transport._authprovider == "cookie": # revoke authentication and invalidate cookie (will be revalidated if following auth is successful) self.transport._authmethod = None self.transport._authrealm = None self.transport._authid = None if hasattr(self.transport, "_cbtid"): self.transport.factory._cookiestore.setAuth(self.transport._cbtid, None, None, None, None, None) else: pass # TLS authentication is not revoked here # already authenticated, e.g. via HTTP-cookie or TLS-client-certificate authentication if self.transport._authid is not None and ( self.transport._authmethod == "trusted" or self.transport._authprovider in authmethods ): msg.realm = realm msg.authid = self.transport._authid msg.authrole = self.transport._authrole msg.authextra = self.transport._authextra details = types.HelloDetails( realm=realm, authmethods=authmethods, authid=msg.authid, authrole=msg.authrole, authextra=msg.authextra, session_roles=msg.roles, pending_session=self._pending_session_id, ) # start authentication based on configuration, compare/sync with code here: # https://github.com/crossbario/crossbar/blob/6b6e25b1356b0641eff5dc5086d3971ecfb9a421/crossbar/router/session.py#L861 auth_config = self._transport_config.get("auth", None) # if authentication is _not_ configured, allow anyone to join as "anonymous"! if not auth_config: # we ignore any details.authid the client might have announced, and use # a cookie value or a random value if hasattr(self.transport, "_cbtid") and self.transport._cbtid: # if cookie tracking is enabled, set authid to cookie value authid = self.transport._cbtid else: # if no cookie tracking, generate a random value for authid authid = util.generate_serial_number() # FIXME: really forward any requested authrole? authrole = msg.authrole auth_config = { "anonymous": { "type": "static", "authrole": authrole, "authid": authid, } } self.log.warn( "{func} no authentication configured for proxy frontend session (using builtin anonymous " "access policy)", func=hltype(self._process_Hello), ) # iterate over authentication methods announced by client .. for authmethod in authmethods: # invalid authmethod if authmethod not in AUTHMETHOD_MAP and authmethod not in extra_auth_methods: self.transport.send( message.Abort( ApplicationError.AUTHENTICATION_FAILED, message='authmethod "{}" not allowed'.format(authmethod), ) ) return # authmethod is valid, but not configured: continue trying other authmethods the client is announcing if authmethod not in auth_config: continue # authmethod not available if authmethod not in AUTHMETHOD_MAP and authmethod not in extra_auth_methods: self.log.debug( "{func} client requested valid, but unavailable authentication method {authmethod}", func=hltype(self._process_Hello), authmethod=authmethod, ) continue # WAMP-Cookie authentication if authmethod == "cookie": cbtid = self.transport._cbtid if cbtid: if self.transport.factory._cookiestore.exists(cbtid): _cookie_authid, _cookie_authrole, _cookie_authmethod, _cookie_authrealm, _cookie_authextra = ( self.transport.factory._cookiestore.getAuth(cbtid) ) if _cookie_authid is None: self.log.info( "{func}: received cookie for cbtid={cbtid} not authenticated before [2]", func=hltype(self._process_Hello), cbtid=hlid(cbtid), ) continue else: self.log.debug( "{func}: authentication for received cookie {cbtid} found: authid={authid}, authrole={authrole}, authmethod={authmethod}, authrealm={authrealm}, authextra={authextra}", func=hltype(self._process_Hello), cbtid=hlid(cbtid), authid=hlid(_cookie_authid), authrole=hlid(_cookie_authrole), authmethod=hlid(_cookie_authmethod), authrealm=hlid(_cookie_authrealm), authextra=_cookie_authextra, ) hello_result = types.Accept( realm=_cookie_authrealm, authid=_cookie_authid, authrole=_cookie_authrole, authmethod=_cookie_authmethod, authprovider="cookie", authextra=_cookie_authextra, ) else: self.log.debug( "{func}: received cookie for cbtid={cbtid} not authenticated before [1]", func=hltype(self._process_Hello), cbtid=hlid(cbtid), ) continue else: # the client requested cookie authentication, but there is 1) no cookie set, # or 2) a cookie set, but that cookie wasn't authenticated before using # a different auth method (if it had been, we would never have entered here, since then # auth info would already have been extracted from the transport) # consequently, we skip this auth method and move on to next auth method. self.log.debug("{func}: no cookie set for cbtid", func=hltype(self._process_Hello)) continue else: # create instance of authenticator using authenticator class for the respective authmethod authklass = ( extra_auth_methods[authmethod] if authmethod in extra_auth_methods else AUTHMETHOD_MAP[authmethod] ) if authklass is None: self.log.warn( '{func}: skipping authenticator for authmethod "{authmethod}"', func=hltype(self._process_Hello), authmethod=hlval(authmethod), ) self.log.warn() continue self.log.info( '{func}: instantiating authenticator class {authklass} for authmethod "{authmethod}"', func=hltype(self._process_Hello), authklass=hltype(authklass), authmethod=hlval(authmethod), ) self._pending_auth = authklass( self._pending_session_id, self.transport.transport_details, self._controller, auth_config[authmethod], ) try: # call into authenticator for processing the HELLO message hello_result = yield as_future(self._pending_auth.hello, realm, details) except Exception as e: self.log.failure() self._send_if_transport_is_alive( message.Abort( ApplicationError.AUTHENTICATION_FAILED, message="Frontend connection accept failed ({})".format(e), ) ) return self.log.debug( '{func} authmethod "{authmethod}" completed with result={hello_result}', func=hltype(self._process_Hello), authmethod=hlval(authmethod), hello_result=hello_result, ) # check if client disconnected while we were yielding to authenticator if not self.transport: self.log.debug( "{func} proxy frontend disconnected while processing hello in authenticator:" ': session_id={session_id}, session={session}, details="{details}"', func=hltype(self._process_Hello), session_id=hlid(self._session_id), session=self, details=details, ) return else: # if the frontend session is accepted right away (eg when doing "anonymous" authentication), process the # frontend accept .. if isinstance(hello_result, types.Accept): try: # get a backend session mapped to the incoming frontend session session = yield self.frontend_accepted(hello_result) except Exception as e: self.log.failure() self._send_if_transport_is_alive( message.Abort( ApplicationError.AUTHENTICATION_FAILED, message="Frontend connection accept failed ({})".format(e), ) ) return if not self.transport: # we have not yet established a backend session, only authenticator session was used self.log.debug( "{func} proxy frontend disconnected while connecting backend session" ': session_id={session_id}, session={session}, details="{details}"', func=hltype(self._process_Hello), session_id=hlid(self._session_id), session=self, details=details, ) self._controller.unmap_backend(self, session) self._backend_session = None else: def _on_backend_joined(session, details): # we now got everything! the frontend is authenticated, and a backend session is associated. msg = message.Welcome( self._session_id, ProxyFrontendSession.ROLES, realm=details.realm, authid=details.authid, authrole=details.authrole, authmethod=hello_result.authmethod, authprovider=hello_result.authprovider, authextra=dict(details.authextra or {}, **self._custom_authextra), ) if self.transport: self._backend_session = session self.transport.send(msg) self.log.debug( "{func} proxy frontend session WELCOME: session_id={session_id}, session={session}, " 'details="{details}"', func=hltype(self._process_Hello), session_id=hlid(self._session_id), session=self, details=details, ) else: self.log.debug( "{func} proxy frontend disconnected while joining backend session" ': session_id={session_id}, session={session}, details="{details}"', func=hltype(self._process_Hello), session_id=hlid(self._session_id), session=self, details=details, ) self._controller.unmap_backend(self, session) self._backend_session = None session.on("join", _on_backend_joined) # if the client is required to do an authentication message exchange, answer sending a CHALLENGE message elif isinstance(hello_result, types.Challenge): self._send_if_transport_is_alive(message.Challenge(hello_result.method, extra=hello_result.extra)) # if the client is denied right away, answer by sending an ABORT message elif isinstance(hello_result, types.Deny): self._send_if_transport_is_alive(message.Abort(hello_result.reason, message=hello_result.message)) else: # should not arrive here: internal (logic) error self.log.warn( "{func} internal error: unexpected authenticator return type {rtype}", rtype=hltype(hello_result), func=hltype(self._process_Hello), ) self._send_if_transport_is_alive( message.Abort( ApplicationError.AUTHENTICATION_FAILED, message="internal error: unexpected authenticator return type {}".format( type(hello_result) ), ) ) return self._send_if_transport_is_alive( message.Abort(ApplicationError.NO_AUTH_METHOD, message="no suitable authmethod found") ) @inlineCallbacks def _process_Authenticate(self, msg): self.log.debug( "{func} proxy frontend session process AUTHENTICATE (msg={msg})", func=hltype(self._process_Authenticate), msg=msg, ) if self._pending_auth: if ( isinstance(self._pending_auth, PendingAuthTicket) or isinstance(self._pending_auth, PendingAuthWampCra) or isinstance(self._pending_auth, PendingAuthCryptosign) or isinstance(self._pending_auth, PendingAuthScram) ): auth_result = yield as_future(self._pending_auth.authenticate, msg.signature) self.log.debug( "{func} processed pending authentication {pending_auth}: {authresult}", func=hltype(self._process_Authenticate), pending_auth=self._pending_auth, authresult=auth_result, ) # check if client disconnected while we were yielding to authenticator if not self.transport: # we have not yet established a backend session, only authenticator session was used self.log.info( "{func} frontend disconnected while processing pending" " authentication {pending_auth}: {authresult}", func=hltype(self._process_Authenticate), pending_auth=self._pending_auth, authresult=auth_result, ) else: if isinstance(auth_result, types.Accept): try: session = yield self.frontend_accepted(auth_result) except TransportLost: self.log.info( "{func} frontend disconnected while connecting backend session {pending_auth}", func=hltype(self._process_Authenticate), pending_auth=self._pending_auth, ) except Exception as e: self.log.failure() self._send_if_transport_is_alive( message.Abort( ApplicationError.AUTHENTICATION_FAILED, message="Frontend connection accept failed ({})".format(e), ) ) else: if self.transport is None: # we have not yet established a backend session, only authenticator session was used self.log.info( "{func} frontend disconnected connecting backend session {pending_auth}", func=hltype(self._process_Authenticate), pending_auth=self._pending_auth, ) self._controller.unmap_backend(self, session) self._backend_session = None else: def _on_backend_joined(session, details): msg = message.Welcome( self._session_id, ProxyFrontendSession.ROLES, realm=details.realm, authid=details.authid, authrole=details.authrole, authmethod=auth_result.authmethod, authprovider=auth_result.authprovider, authextra=dict(details.authextra or {}, **self._custom_authextra), ) if self.transport: self._backend_session = session self.transport.send(msg) self.log.debug( "{func} proxy frontend session WELCOME: session_id={session_id}, " "session={session}, msg={msg}", func=hltype(self._process_Authenticate), session_id=hlid(self._session_id), session=self, msg=msg, ) else: self.log.info( "{func} frontend disconnected while joining backend session {pending_auth}", func=hltype(self._process_Authenticate), pending_auth=self._pending_auth, ) self._controller.unmap_backend(self, session) self._backend_session = None session.on("join", _on_backend_joined) elif isinstance(auth_result, types.Deny): self._send_if_transport_is_alive( message.Abort(auth_result.reason, message=auth_result.message) ) else: # should not arrive here: logic error self.log.warn( "{func} internal error: unexpected authenticator return type {rtype}", rtype=hltype(auth_result), func=hltype(self._process_Authenticate), ) self._send_if_transport_is_alive( message.Abort( ApplicationError.AUTHENTICATION_FAILED, message="internal error: unexpected authenticator return type {}".format( type(auth_result) ), ) ) else: # should not arrive here: logic error self._send_if_transport_is_alive( message.Abort( ApplicationError.AUTHENTICATION_FAILED, message="internal error: unexpected pending authentication", ) ) else: # should not arrive here: client misbehaving! self._send_if_transport_is_alive( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message="no pending authentication") ) ITransportHandler.register(ProxyFrontendSession) class ProxyBackendSession(Session): """ This is a single WAMP session to the real backend service There is one of these for every client connection. (In the future, we could multiplex over a single backend connection -- for now, there's a backend connection per frontend client). """ def onOpen(self, transport): self.log.debug("{func}(transport={transport})", func=hltype(self.onOpen), transport=transport) # instance of Frontend (frontend_session) self._frontend = transport._proxy_other_side self._on_connect = Deferred() self._on_ready = Deferred() return Session.onOpen(self, transport) def onConnect(self): """ The base class will call .join() which we do NOT want to do; instead we await the frontend sending its hello and forward that along. """ self.log.debug("{func}(): on_connect={on_connect}", func=hltype(self.onConnect), on_connect=self._on_connect) self._on_connect.callback(None) def onChallenge(self, challenge): self.log.debug("{func}(challenge={})", func=hltype(self.onChallenge), challenge=challenge) if challenge.method == "cryptosign-proxy": return super(ProxyBackendSession, self).onChallenge(types.Challenge("cryptosign", extra=challenge.extra)) return super(ProxyBackendSession, self).onChallenge(challenge) def onWelcome(self, _msg): self.log.debug("{func}(message={msg})", func=hltype(self.onWelcome), msg=_msg) # This is WRONG: # if msg.authmethod == "cryptosign-proxy": # msg.authmethod = "cryptosign" # elif msg.authmethod == "anonymous-proxy": # msg.authmethod = "anonymous" return super(ProxyBackendSession, self).onWelcome(_msg) def onJoin(self, details): self.log.info( "{func} proxy backend session joined (authmethod={authmethod}, authprovider={authprovider}): " 'realm="{realm}", authid="{authid}", authrole="{authrole}"', func=hltype(self.onJoin), realm=hlid(details.realm), authid=hlid(details.authid), authrole=hlid(details.authrole), authmethod=hlval(details.authmethod), authprovider=hlval(details.authprovider), ) if not self._on_ready.called: self._on_ready.callback(self) def onClose(self, wasClean): self.log.info( "{func} proxy backend session closed (wasClean={wasClean})", func=hltype(self.onClose), wasClean=wasClean ) if self._frontend is not None and self._frontend.transport is not None: try: if self._session_id: self._frontend.transport.send(message.Goodbye()) else: self._frontend.transport.send(message.Abort(ApplicationError.AUTHENTICATION_FAILED)) except Exception as e: self.log.info( "Backend closed, Abort/Goodbye to frontend failed: {fail}", fail=e, ) self._frontend = None super(ProxyBackendSession, self).onClose(wasClean) def onMessage(self, msg: IMessage): if isinstance(msg, (message.Welcome, message.Challenge, message.Abort, message.Goodbye)): super(ProxyBackendSession, self).onMessage(msg) else: # msg is a real WAMP message that our backend WAMP protocol has deserialized. so now we re-serialize it # for whatever the frontend is speaking and forward self._frontend._forward(msg) def make_backend_connection( reactor: ReactorBase, controller: "ProxyController", backend_config: Dict[str, Any], frontend_session: ApplicationSession, ) -> Deferred: """ Create a connection to a router backend, wiring up the given proxy frontend session to forward WAMP in both directions between the frontend and backend sessions. Backend configuration example: .. code-block:: json { 'auth': { 'cryptosign-proxy': { 'type': 'static' } }, 'transport': { 'type': 'rawsocket', 'endpoint': { 'type': 'tcp', 'host': '127.0.0.1', 'port': 8442 }, 'serializer': 'cbor', 'url': 'rs://localhost' } } :param reactor: Twisted reactor to use. :param controller: The proxy controller the backend connection originates from. :param cbdir: This node's directory. :param backend_config: Proxy backend connection. :param frontend_session: The proxy frontend session for which to create a mapped backend connection. :return: A deferred that resolves with a proxy backend session that is joined on the realm, under the authrole, as the proxy frontend session. """ log.info( "{func} proxy connecting to backend with backend_config=\n{backend_config}", func=hltype(make_backend_connection), backend_config=pformat(backend_config), ) cbdir = controller.cbdir # fired when the component has connected, authenticated and joined a realm on the backend node ready = Deferred() # connected node transport backend = _create_transport(0, backend_config["transport"]) # connecting node (this node) private key key = _read_node_key(cbdir, private=True) # factory for proxy->router backend connections, uses authentication (to router backend worker) def create_session(): # this is our WAMP session to the backend session = ProxyBackendSession() authextra = {} log.debug( "{func}::create_session() connecting to backend with authextra=\n{authextra}", func=hltype(make_backend_connection), authextra=pformat(authextra), ) # if auth is configured and includes "cryptosign-proxy", always prefer # that and connect to the backend node authenticating with WAMP-cryptosign # using the connecting proxy node's key # # authentication via WAMP-cryptosign SHOULD always be possible with the backend node # if "auth" in backend_config and "cryptosign-proxy" in backend_config["auth"]: session.add_authenticator( create_authenticator("cryptosign-proxy", privkey=key["hex"], authextra=authextra) ) log.debug( "{func} using cryptosign-proxy authenticator for backend connection, authextra=\n{authextra}", func=hltype(make_backend_connection), authextra=pformat(authextra), ) # if auth is not configured, or is configured and includes "anonymous-proxy", # try to connect to the backend node authenticating with WAMP-anonymous # # authentication via WAMP-anonymous MAY be possible with the backend node if enabled # elif "auth" not in backend_config or "anonymous-proxy" in backend_config["auth"]: # IMPORTANT: this is security sensitive! we only allow anonymous proxy # locally on a host, that is, when the transport type is Unix domain socket if backend_config["transport"]["endpoint"]["type"] == "unix": session.add_authenticator(create_authenticator("anonymous-proxy", authextra=authextra)) log.debug( "{func} using anonymous-proxy over UDS authenticator for backend connection, " "authextra=\n{authextra}", func=hltype(make_backend_connection), authextra=pformat(authextra), ) else: raise RuntimeError( 'anonymous-proxy authenticator only allowed on Unix domain socket based transports, not type "{}"'.format( backend_config["transport"]["endpoint"]["type"] ) ) # no valid authentication method found else: raise RuntimeError("could not determine valid authentication method to connect to the backend node") def connected(new_session, transport): ready.callback(new_session) session.on("connect", connected) return session # client transport factory to carry our session factory = _create_transport_factory(reactor, backend, create_session) # reduce noise from logs, otherwise for each connect/disconnect to the backend factory.noisy = False endpoint = _create_transport_endpoint(reactor, backend_config["transport"]["endpoint"]) transport_d = endpoint.connect(factory) def _connected(proto): proto._proxy_other_side = frontend_session return proto def _error(f): # backend session disconnected without ever having joined before if not ready.called: ready.errback(f) transport_d.addErrback(_error) transport_d.addCallback(_connected) return ready class AuthenticatorSession(ApplicationSession): # when running over TLS, require TLS channel binding # CHANNEL_BINDING = 'tls-unique' CHANNEL_BINDING = None def __init__(self, config=None): ApplicationSession.__init__(self, config) # load the client private key (raw format) try: self._key = cryptosign.CryptosignKey.from_bytes(config.extra["key"]) except: self.log.failure() if self.is_attached(): self.leave() else: self.log.info( "{func} client public key loaded: {pubkey}", pubkey=hlval(self._key.public_key()), func=hltype(self.__init__), ) def onConnect(self): extra = { "pubkey": self._key.public_key(), } self.join( self.config.realm, authmethods=["cryptosign"], authid=self.config.extra.get("authid", None), authextra=extra, ) async def onChallenge(self, challenge): try: channel_id_type = self.CHANNEL_BINDING channel_id = self.transport.transport_details.channel_id.get(self.CHANNEL_BINDING, None) signed_challenge = await self._key.sign_challenge( challenge, channel_id=channel_id, channel_id_type=channel_id_type ) return signed_challenge except: self.log.failure() raise def onJoin(self, details): if self.config.extra["ready"]: self.config.extra["ready"].callback(self) self.config.extra["ready"] = None def onLeave(self, details): self.log.info("{func} session closed (details={details})", details=details, func=hltype(self.onDisconnect)) def onDisconnect(self): self.log.info("{func} connection closed", func=hltype(self.onDisconnect)) def make_service_session( reactor: ReactorBase, controller: "ProxyController", backend_config: Dict[str, Any], realm: str, authrole: str ) -> Deferred: """ Create a connection to a router backend, creating a new service session. :param reactor: Twisted reactor to use. :param controller: The proxy controller this service session is for. :param backend_config: Proxy backend connection. :param realm: The WAMP realm the service session is joined on. :param authrole: The WAMP authrole the service session is joined as. :return: A service session joined on the given realm, under the given authrole. """ cbdir = controller.cbdir # authid of the proxy session forwarded to the backend: for service session that are # not forwarding incoming session (like make_backend_session), but represent an # independent session (exposed on the proxy), we synthesize an authid proxy_authid = "proxy-{}".format(util.generate_serial_number()) # authid of the connecting backend (proxy service) session is this proxy node's ID backend_authid = controller.node_id # if auth is configured and includes "cryptosign-proxy", always prefer # that and connect to the backend node authenticating with WAMP-cryptosign # using the connecting proxy node's key # # authentication via WAMP-cryptosign SHOULD always be possible with the backend node # if "auth" in backend_config and "cryptosign-proxy" in backend_config["auth"]: # we will do cryptosign authentication to any backend node # FIXME: get node private key from this proxy node node_privkey = _read_node_key(cbdir, private=True)["hex"] authentication = { "cryptosign-proxy": { "privkey": node_privkey, "authid": backend_authid, "authextra": { "proxy_realm": realm, "proxy_authid": proxy_authid, "proxy_authrole": authrole, "proxy_authextra": None, }, } } # if auth is not configured, or is configured and includes "anonymous-proxy", # try to connect to the backend node authenticating with WAMP-anonymous # # authentication via WAMP-anonymous MAY be possible with the backend node if enabled # elif "auth" not in backend_config or "anonymous-proxy" in backend_config["auth"]: # IMPORTANT: this is security sensitive! we only allow anonymous proxy # locally on a host, that is, when the transport type is Unix domain socket if backend_config["transport"]["endpoint"]["type"] == "unix": authentication = { "anonymous-proxy": { "authid": backend_authid, "authextra": { "proxy_realm": realm, "proxy_authid": proxy_authid, "proxy_authrole": authrole, "proxy_authextra": None, }, } } else: raise RuntimeError( 'anonymous-proxy authenticator only allowed on Unix domain socket based transports, not type "{}"'.format( backend_config["transport"]["endpoint"]["type"] ) ) # no valid authentication method found else: raise RuntimeError("could not determine valid authentication method to connect to the backend node") # use Component API and create a component for the service session comp = Component(transports=[backend_config["transport"]], realm=realm, authentication=authentication) # fired when the component has connected, authenticated and joined a realm on the backend node ready = Deferred() @comp.on_join def joined(session, details): ready.callback(session) @comp.on_disconnect def disconnect(session, was_clean=False): if not ready.called: ready.errback(RuntimeError("backend session disconnected without ever having joined before")) # start the component and return the component's ready deferred comp.start(reactor) return ready STATE_CREATED = 1 STATE_STARTING = 2 STATE_STARTED = 3 STATE_FAILED = 4 STATE_STOPPING = 5 STATE_STOPPED = 6 STATES = { STATE_CREATED: "created", STATE_STARTING: "starting", STATE_STARTED: "started", STATE_FAILED: "failed", STATE_STOPPING: "stopping", STATE_STOPPED: "stopped", }
[docs] class ProxyRoute(object): """ Proxy route run-time representation. """
[docs] log = make_logger()
def __init__(self, controller: "ProxyController", realm_name: str, route_id: str, config: Dict[str, Any]): """ :param controller: The (proxy) worker controller session the proxy connection is created from. :param realm_name: The realm this route applies to. :param route_id: The run-time route ID within the proxy worker. :param config: The proxy route's configuration, which is a dictionary of role names and connection IDs as values. """
[docs] self._controller = controller
[docs] self._realm_name = realm_name
[docs] self._route_id = route_id
[docs] self._config = config
[docs] self._started = None
[docs] self._stopped = None
[docs] self._state = STATE_CREATED
[docs] def marshal(self) -> Dict[str, Any]: return { "realm": self._realm_name, "id": self._route_id, "config": self._config, "started": self._started, "stopped": self._stopped, "state": self._state, }
[docs] def __str__(self): return pformat(self.marshal())
@property
[docs] def realm(self) -> str: """ :return: The realm this route applies to. """ return self._realm_name
[docs] def has_role(self, role_name) -> bool: """ Checks if the given role is mapped in this proxy route. :param role_name: Role to lookup. :return: ``True`` if the role is configured in this proxy route. """ return role_name in self._config
[docs] def map_connection_id(self, role_name) -> Optional[str]: """ Map the given role to a connection ID according to the configuration of this route. :param role_name: Role to map. :return: Connection ID configured for the role in this proxy route. """ if role_name in self._config: return self._config[role_name] else: return None
@property
[docs] def id(self) -> str: """ :return: The ID of this proxy route. """ return self._route_id
@property
[docs] def config(self) -> Dict[str, Any]: """ :return: The original configuration as supplied to this proxy route. """ return self._config
@property
[docs] def started(self) -> Optional[int]: """ :return: When this route was started in it's hosting worker. """ return self._started
@property
[docs] def stopped(self) -> Optional[int]: """ :return: When this route was stopped in it's hosting worker. """ return self._stopped
@property
[docs] def state(self) -> int: """ :return: Current state of route in it's hosting worker. """ return self._state
@inlineCallbacks
[docs] def start(self): """ Start proxy route. """ assert self._state == STATE_CREATED self._state = STATE_STARTING topic = "{}.on_proxy_route_starting".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) self._state = STATE_STARTED self._started = time_ns() self._stopped = None topic = "{}.on_proxy_route_started".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) self.log.info( '{func} proxy route {route_id} started for realm "{realm}" with config=\n{config}', func=hltype(self.start), route_id=hlid(self._route_id), realm=hlval(self._realm_name), config=pformat(self._config), )
@inlineCallbacks
[docs] def stop(self): """ Stop proxy route. """ assert self._state == STATE_STARTED self._state = STATE_STOPPING topic = "{}.on_proxy_route_stopping".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) self._state = STATE_STOPPED self._started = None self._stopped = time_ns() topic = "{}.on_proxy_route_stopped".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) self.log.info( '{func} proxy route {route_id} stopped for realm "{realm}"', func=hltype(self.stop), route_id=hlid(self._route_id), realm=hlval(self._realm_name), )
[docs] class ProxyConnection(object): """ Proxy connection run-time representation. """
[docs] log = make_logger()
def __init__(self, controller: "ProxyController", connection_id: str, config: Dict[str, Any]): """ Example connection configuration for a Unix domain socket based connection using WAMP-anonymous proxy authentication: .. code-block:: json { "transport": { "type": "rawsocket", "endpoint": { "type": "unix", "path": "router.sock" }, "url": "ws://localhost", "serializer": "cbor" }, "auth": { "anonymous-proxy": { "type": "static" } } } Example connection configuration for a TCP based connection using WAMP-cryptosign proxy authentication: .. code-block:: json { "transport": { "type": "rawsocket", "endpoint": { "type": "tcp", "host": "core1", "port": 10023 }, "url": "ws://core1", "serializer": "cbor" }, "auth": { "cryptosign-proxy": { "type": "static" } } } :param controller: The (proxy) worker controller session the proxy connection is created from. :param connection_id: The run-time connection ID within the proxy worker. :param config: The proxy connection's configuration. """
[docs] self._controller = controller
[docs] self._connection_id = connection_id
[docs] self._config = config
[docs] self._started = None
[docs] self._stopped = None
[docs] self._state = STATE_CREATED
[docs] def marshal(self): return { "id": self._connection_id, "config": self._config, "started": self._started, "stopped": self._stopped, "state": self._state, }
[docs] def __str__(self): return pformat(self.marshal())
@property
[docs] def id(self) -> str: """ :return: The ID of this proxy backend connection. """ return self._connection_id
@property
[docs] def config(self) -> Dict[str, Any]: """ :return: The original configuration as supplied to this proxy backend connection. """ return self._config
@property
[docs] def started(self) -> Optional[int]: """ :return: When this proxy backend connection was started (Posix time in ns). """ return self._started
@property
[docs] def stopped(self) -> Optional[int]: """ :return: When this proxy backend connection was stopped (Posix time in ns). """ return self._stopped
@property
[docs] def state(self) -> int: """ :return: Current state of this proxy backend connection. """ return self._state
@inlineCallbacks
[docs] def start(self): """ Start this proxy backend connection. """ assert self._state == STATE_CREATED self._state = STATE_STARTING topic = "{}.on_proxy_connection_starting".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) self._state = STATE_STARTED self._started = time_ns() self._stopped = None topic = "{}.on_proxy_connection_started".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True))
@inlineCallbacks
[docs] def stop(self): """ Stop this proxy backend connection. """ assert self._state == STATE_STARTED self._state = STATE_STOPPING topic = "{}.on_proxy_connection_stopping".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) self._state = STATE_STOPPED self._started = None self._stopped = time_ns() topic = "{}.on_proxy_connection_stopped".format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True))
[docs] class ProxyController(TransportController): """ Controller for proxy workers. Manages: * **proxy transports**, for accepting incoming client connections * **proxy connections**, for backend router connections * **proxy routes**, for routes from ``(realm_name, role_name)`` to backend router connections and * web transport services (from base class `TransportController`), when running a proxy transport of type ``web``. Proxy controllers also inherit more procedures and events from the base classes * :class:`crossbar.worker.transport.TransportController`, * :class:`crossbar.worker.controller.WorkerController` and * :class:`crossbar.common.process.NativeProcess`. """
[docs] WORKER_TYPE = "proxy"
[docs] WORKER_TITLE = "WAMP proxy"
def __init__(self, config=None, reactor=None, personality=None): super(ProxyController, self).__init__( config=config, reactor=reactor, personality=personality, ) # the Twisted reactor under which to run
[docs] self._reactor = reactor
# the node's home directory of this worker
[docs] self._cbdir = config.extra.cbdir
# map: connection_id -> ProxyConnection
[docs] self._connections: Dict[str, ProxyConnection] = {}
# map: (realm, authrole) -> Set[ProxyConnection]
[docs] self._connections_by_auth: Dict[Tuple[str, str], Set[ProxyConnection]] = {}
# map: realm_name -> route_id -> ProxyRoute
[docs] self._routes: Dict[str, Dict[str, ProxyRoute]] = {}
# map: connection_id -> Set[ProxyRoute]
[docs] self._routes_by_connection: Dict[str, Set[ProxyRoute]] = {}
# for creating route IDs
[docs] self._next_route_id = 0
# next route to use in a realm while forwarding connections # map: (realm, authrole) -> int
[docs] self._roundrobin_idx = {}
# since we share some functionality with RouterController we # need to have a router_session_factory
[docs] self._router_factory = RouterFactory( self.config.extra.node, self.config.extra.worker, self, # ProxySession get to ProxyController via .worker here )
[docs] self._router_session_factory = RouterSessionFactory(self._router_factory)
self._router_session_factory.session = ProxyFrontendSession # currently mapped session: map of frontend_session => backend_session
[docs] self._backends_by_frontend = {}
# map: (realm_name, role_name) -> ProxyRoute
[docs] self._service_sessions = {}
[docs] def has_realm(self, realm: str) -> bool: """ Check if a route to a realm with the given name is currently running. :param realm: Realm name (the WAMP name, _not_ the run-time object ID). :returns: True if a route to the realm (for any role) exists. """ result = realm in self._routes self.log.debug( '{func}(realm="{realm}") -> {result}', func=hltype(ProxyController.has_realm), realm=hlid(realm), result=hlval(result), ) return result
[docs] def has_role(self, realm: str, authrole: str) -> bool: """ Check if a role with the given name is currently running in the given realm. :param realm: WAMP realm (the WAMP name, _not_ the run-time object ID). :param authrole: WAMP authentication role (the WAMP URI, _not_ the run-time object ID). :returns: True if a route to the realm for the role exists. """ authrole = authrole or "trusted" if realm in self._routes: realm_routes = self._routes[realm] # the route config is a map with role name as key result = any(authrole in route.config for route in realm_routes.values()) else: realm_routes = None result = False self.log.debug( '{func}(realm="{realm}", authrole="{authrole}") -> {result} [routes={routes}, realm_routes={realm_routes}]', func=hltype(ProxyController.has_role), realm=hlid(realm), authrole=hlid(authrole), result=hlval(result), realm_routes=hlval([route.config for route in realm_routes.values()] if realm_routes else []), routes=sorted(self._routes.keys()), ) return result
@inlineCallbacks
[docs] def get_service_session(self, realm: str, authrole: str) -> ApplicationSession: """ Returns a cached service session on the given realm using the given role. Service sessions are used for: * access dynamic authenticators (see :method:`crossbar.router.auth.pending.PendingAuth._init_dynamic_authenticator`) * access the WAMP meta API for the realm * forward to/from WAMP for the HTTP bridge Service sessions are NOT used to forward WAMP client connections incoming to the proxy worker. :param realm: WAMP realm (the WAMP name, _not_ the run-time object ID). :param authrole: WAMP authentication role (the WAMP URI, _not_ the run-time object ID). :returns: The service session for the realm. """ try: self.log.info( '{klass}.get_service_session(realm="{realm}", authrole="{authrole}")', klass=self.__class__.__name__, realm=realm, authrole=authrole, ) # create new service session for (realm, authrole) if it doesn't exist yet .. # .. check for realm if realm not in self._service_sessions: if self.has_realm(realm): self.log.info( '{klass}.get_service_session(realm="{realm}", authrole="{authrole}") -> ' "not cached, creating new session ..", klass=self.__class__.__name__, realm=realm, authrole=authrole, ) self._service_sessions[realm] = {} else: # mark as non-existing! self._service_sessions[realm] = None # .. check for (realm, authrole) if self._service_sessions[realm] is not None and authrole not in self._service_sessions[realm]: if self.has_role(realm, authrole): # get backend connection configuration selected (round-robin or randomly) from all routes # for the desired (realm, authrole) backend_config = self.get_backend_config(realm, authrole) # create and store a new service session connected to the backend router worker self._service_sessions[realm][authrole] = make_service_session( self._reactor, self, backend_config, realm, authrole ) else: # mark as non-existing! self._service_sessions[realm][authrole] = None if self._service_sessions[realm] and self._service_sessions[realm][authrole]: service_session_or_deferred = self._service_sessions[realm][authrole] if isinstance(service_session_or_deferred, Deferred): service_session = yield service_session_or_deferred if service_session is not None: self._service_sessions[realm][authrole] = service_session # return cached service session if self._service_sessions[realm] and self._service_sessions[realm][authrole]: service_session = self._service_sessions[realm][authrole] self.log.info( '{klass}.get_service_session(realm="{realm}", authrole="{authrole}") -> found cached service ' 'session {session} with authid "{session_authid}" and authrole "{session_authrole}"', klass=self.__class__.__name__, realm=realm, authrole=authrole, session=service_session.session_id, session_authid=service_session.authid, session_authrole=service_session.authrole, ) return service_session else: self.log.warn( '{klass}.get_service_session(realm="{realm}", authrole="{authrole}") -> no such realm/authrole!', klass=self.__class__.__name__, realm=realm, authrole=authrole, ) return None except: self.log.failure() raise
[docs] def can_map_backend(self, session_id, realm, authid, authrole, authextra): """ Checks if the proxy can map the incoming frontend session to a backend. :returns: True only-if map_backend() can succeed later for the same args (essentially, if the realm + role exist). """ return self.has_realm(realm) and self.has_role(realm, authrole)
@inlineCallbacks
[docs] def map_backend(self, frontend, realm: str, authid: str, authrole: str, authextra: Optional[Dict[str, Any]]): """ Returns the cached backend forwarding session for the given frontend session. Map the given frontend session to a backend session under the given authentication credentials. :param frontend: :param realm: :param authid: :param authrole: :param authextra: :return: a protocol instance connected to the backend """ self.log.debug( '{func}(frontend={frontend}, realm="{realm}", authid="{authid}", authrole="{authrole}", ' "authextra={authextra})", func=hltype(self.map_backend), frontend=frontend, realm=hlid(realm), authid=hlid(authid), authrole=hlid(authrole), authextra=authextra, ) if frontend in self._backends_by_frontend: backend = self._backends_by_frontend[frontend] self.log.info("{func} CACHE HIT {backend}", func=hltype(self.map_backend), backend=backend) return backend backend_config = self.get_backend_config(realm, authrole) # if auth uses cryptosign but has no privkey, we'd ideally # insert the node's private key if authrole is None: if len(self._routes.get(realm, set())) != 1: raise RuntimeError("Cannot select default role unless realm has exactly 1") self.log.debug( '{func} CACHE MISS - opening new proxy backend connection for realm "{realm}", authrole "{authrole}" ' "using backend_config=\n{backend_config}", func=hltype(self.map_backend), backend_config=pformat(backend_config), realm=hlid(realm), authrole=hlid(authrole), ) try: backend_proto = yield make_backend_connection(self._reactor, self, backend_config, frontend) except DNSLookupError as e: self.log.warn( "{func} proxy worker could not connect to router backend: DNS resolution failed ({error})", func=hltype(self.map_backend), error=str(e), ) raise e if frontend: self._backends_by_frontend[frontend] = backend_proto self.log.debug( '{func} proxy backend connection opened mapping frontend session to realm "{realm}", authrole "{authrole}"', func=hltype(self.map_backend), backend_config=pformat(backend_config), realm=hlid(realm), authrole=hlid(authrole), ) returnValue(backend_proto)
[docs] def unmap_backend(self, frontend, backend, leave_reason=None, leave_message=None): """ Unmap the backend session from the given frontend session it is currently mapped to. """ self.log.debug( "{func}(frontend={frontend}, backend={backend})", func=hltype(self.unmap_backend), frontend=frontend, backend=backend, ) if frontend in self._backends_by_frontend: if self._backends_by_frontend[frontend] == backend: # alright, the given frontend is indeed currently mapped to the given backend session: close the # session and delete it backend.leave(reason=leave_reason, message=leave_message) del self._backends_by_frontend[frontend] self.log.debug( "{func} unmapped frontend session {frontend_session_id} from backend session {backend_session_id}", func=hltype(self.unmap_backend), frontend_session_id=hlid(frontend._session_id), backend_session_id=hlid(backend._session_id), ) else: self.log.warn( "{func} frontend session {frontend_session_id} currently mapped to backend session " "{backend_session_id} - NOT to specified backend {specified_session_id}".format( func=hltype(self.unmap_backend), frontend_session_id=hlid(frontend._session_id), backend_session_id=hlid(self._backends_by_frontend[frontend]._session_id), specified_session_id=hlid(backend._session_id), ) ) else: if frontend: self.log.warn( "{func} frontend session {session_id} not currently mapped to any backend", func=hltype(self.unmap_backend), session_id=hlid(frontend._session_id), )
[docs] def get_backend_config(self, realm_name, role_name): """ Return backend connection information for the given backend realm and role. :returns: a dict containing the connection configuration for the backend identified by the realm_name and role_name """ assert self.has_role(realm_name, role_name), ( "missing (realm_name={}, role_name={}) in ProxyController routes".format(realm_name, role_name) ) key = realm_name, role_name if key not in self._roundrobin_idx: self._roundrobin_idx[key] = 0 else: self._roundrobin_idx[key] += 1 idx = self._roundrobin_idx[key] % len(self._connections_by_auth[key]) connection = list(self._connections_by_auth[key])[idx] return connection.config
@inlineCallbacks
[docs] def onJoin(self, details): """ Called when worker process has joined the node's management realm. """ self.log.info( '{func} proxy worker "{worker_id}" session {session_id} initializing', func=hltype(self.onJoin), worker_id=hlid(self._worker_id), session_id=hlid(details.session), ) yield WorkerController.onJoin(self, details, publish_ready=False) yield self.publish_ready()
@wamp.register(None)
[docs] def get_proxy_transports(self, details=None): """ Get proxy (listening) transports currently running in this proxy worker. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: List of transport IDs of transports currently running. :rtype: list """ self.log.debug( '{func}(caller_authid="{caller_authid}")', func=hltype(self.get_proxy_transports), caller_authid=hlval(details.caller_authid), ) return sorted(self.transports.keys())
@wamp.register(None)
[docs] def get_proxy_transport(self, transport_id, details=None): """ Get transport currently running in this proxy worker. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: List of transports currently running. :rtype: dict """ self.log.debug( "{func}(transport_id={transport_id})", func=hltype(self.get_proxy_transport), transport_id=hlid(transport_id), caller_authid=hlval(details.caller_authid), ) if transport_id in self.transports: transport = self.transports[transport_id] return transport.marshal() else: raise ApplicationError("crossbar.error.no_such_object", "No transport {}".format(transport_id))
@inlineCallbacks @wamp.register(None)
[docs] def start_proxy_transport(self, transport_id, config, details=None): """ Start a new proxy front-end listening transport. :param transport_id: The run-time ID to start the transport under. :param config: The listening transport configuration. :param details: WAMP call details. :return: Proxy transport run-time metadata. """ self.log.info( '{func}(transport_id="{transport_id}", config={config})', func=hltype(self.start_proxy_transport), transport_id=hlid(transport_id), config="...", caller_authid=hlval(details.caller_authid), ) # prohibit starting a transport twice if transport_id in self.transports: _emsg = 'Could not start transport: a transport with ID "{}" is already running (or starting)'.format( transport_id ) self.log.error(_emsg) raise ApplicationError("crossbar.error.already_running", _emsg) # create a transport and parse the transport configuration # (NOTE: yes, this is re-using create_router_transport so we # can proxy every service a 'real' router can) proxy_transport = self.personality.create_router_transport(self, transport_id, config) caller = details.caller if details else None transport_started = proxy_transport.marshal() self.publish( "{}.on_proxy_transport_starting".format(self._uri_prefix), transport_started, options=types.PublishOptions(exclude=caller), ) # start listening .. try: yield proxy_transport.start(False) except Exception as err: _emsg = "Cannot listen on transport endpoint: {log_failure}" self.log.error(_emsg, log_failure=err) self.publish( "{}.on_proxy_transport_stopped".format(self._uri_prefix), transport_started, options=types.PublishOptions(exclude=caller), ) raise ApplicationError("crossbar.error.cannot_listen", _emsg.format(log_failure=err)) self.transports[transport_id] = proxy_transport self.publish( "{}.on_proxy_transport_started".format(self._uri_prefix), transport_started, options=types.PublishOptions(exclude=caller), ) self.log.info( '{func} proxy transport "{transport_id}" started and listening!', func=hltype(self.start_proxy_transport), transport_id=hlid(transport_id), ) returnValue(proxy_transport.marshal())
@inlineCallbacks @wamp.register(None)
[docs] def stop_proxy_transport(self, transport_id, details=None): """ Stop a currently running proxy front-end listening transport. :param transport_id: The run-time ID of the transport to stop. :param details: WAMP call details. :return: Proxy transport run-time information. """ if transport_id not in self._transports: raise ApplicationError( "crossbar.error.no_such_object", 'no proxy transport with ID "{}" currently running'.format(transport_id), ) caller = details.caller if details else None transport_stopped = self._transports[transport_id].marshal() self.publish( "{}.on_proxy_transport_stopping".format(self._uri_prefix), transport_stopped, options=types.PublishOptions(exclude=caller), ) yield self._transports[transport_id].port.stopListening() del self._transports[transport_id] self.publish( "{}.on_proxy_transport_stopping".format(self._uri_prefix), transport_stopped, options=types.PublishOptions(exclude=caller), ) return transport_stopped
@wamp.register(None)
[docs] def get_proxy_routes(self, details=None): """ Get proxy routes currently running in this proxy worker. :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: List of (target) realm names in proxy routes currently running. :rtype: list """ self.log.debug( '{func}(caller_authid="{caller_authid}")', func=hltype(self.get_proxy_routes), caller_authid=hlval(details.caller_authid), ) return sorted(self._routes.keys())
@wamp.register(None)
[docs] def list_proxy_realm_routes(self, realm_name, details=None): """ Get list of all routes enabled for a particular realm """ if realm_name in self._routes: return [self._routes[realm_name][route_id].marshal() for route_id in self._routes[realm_name].keys()] else: raise ApplicationError( "crossbar.error.no_such_object", 'No route for realm "{}" in proxy'.format(realm_name) )
@wamp.register(None)
[docs] def get_proxy_realm_route(self, realm_name, route_id, details=None): """ Get a particular realm-route :param details: Call details. :type details: :class:`autobahn.wamp.types.CallDetails` :returns: Proxy route object. :rtype: dict """ self.log.debug( "{func}(realm_name={realm_name})", func=hltype(self.get_proxy_realm_route), realm_name=hlid(realm_name), caller_authid=hlval(details.caller_authid), ) try: return self._routes[realm_name][route_id] except KeyError: raise ApplicationError( "crossbar.error.no_such_object", 'No route "{}" for realm "{}" in proxy'.format(route_id, realm_name) )
@inlineCallbacks @wamp.register(None)
[docs] def start_proxy_realm_route(self, realm_name, config, details=None): """ Start a new proxy route for the given realm. A proxy route maps authroles on the given realm to proxy backend connection IDs. Example route configuration: .. code-block:: json { "anonymous": "conn1", "restbridge": "conn1", "user": "conn2" } In this example, the two specified connections ``"conn1"`` and ``"conn2"`` must be running already. :param realm_name: The realm this route should apply for. :param config: The route configuration. :param details: WAMP call details. :return: Proxy route run-time information. """ self.log.info( '{func}(realm_name="{realm_name}", config={config})', func=hltype(self.start_proxy_realm_route), realm_name=realm_name, config=config, ) # check that we already know about all connections specified in the route connection_ids = set() for role_name in config.keys(): connection_id = config[role_name] if connection_id not in self._connections: raise ApplicationError( "crossbar.error.no_such_object", 'no connection "{}" found for realm "{}" and role "{}" in proxy route config'.format( connection_id, realm_name, role_name ), ) else: connection_ids.add(connection_id) # remember connections mapped from proxy routes by (realm, authrole) for role_name in config.keys(): connection_id = config[role_name] connection = self._connections[connection_id] key = (realm_name, role_name) if key not in self._connections_by_auth: self._connections_by_auth[key] = set() self._connections_by_auth[key].add(connection) if realm_name not in self._routes: self._routes[realm_name] = dict() route_id = "route{:03d}".format(self._next_route_id) self._next_route_id += 1 route = ProxyRoute(self, realm_name, route_id, config) yield route.start() # remember route by route ID self._routes[realm_name][route_id] = route # remember route by connections for connection_id in connection_ids: if connection_id not in self._routes_by_connection: self._routes_by_connection[connection_id] = set() self._routes_by_connection[connection_id].add(route) returnValue(route.marshal())
@inlineCallbacks @wamp.register(None)
[docs] def stop_proxy_realm_route(self, realm_name, route_id, details=None): """ Stop a currently running proxy route. :param realm_name: The name of the realm to stop the route for. :param route_id: Which route to stop :param details: WAMP call details. :return: Run-time information about the stopped route. """ self.log.info( '{func}(realm_name={realm_name}, caller_authid="{caller_authid}")', func=hltype(self.stop_proxy_realm_route), realm_name=realm_name, caller_authid=hlval(details.caller_authid), ) if realm_name not in self._routes: raise ApplicationError( "crossbar.error.no_such_object", 'no proxy routes for realm "{}" currently running'.format(realm_name) ) if route_id not in self._routes[realm_name]: raise ApplicationError( "crossbar.error.no_such_object", 'no route "{}" for realm "{}" currently running'.format(route_id, realm_name), ) route = self._routes[realm_name][route_id] yield route.stop() del self._routes[realm_name][route_id] # If all routes are stopped, clear the realm from routes map # Relevant discussion: https://github.com/crossbario/crossbar/pull/1968 if len(self._routes[realm_name]) == 0: del self._routes[realm_name] returnValue(route.marshal())
@wamp.register(None)
[docs] def get_proxy_connections(self, details=None): """ Get currently running proxy connections. :param details: WAMP call details. :return: List of run-time IDs of currently running connection.s """ self.log.debug( '{func}(caller_authid="{caller_authid}")', func=hltype(self.get_proxy_connections), caller_authid=hlval(details.caller_authid), ) return sorted(self._connections.keys())
@wamp.register(None)
[docs] def get_proxy_connection(self, connection_id, details=None): """ Get run-time information for a currently running proxy connection. :param connection_id: The run-time ID of the proxy connection to return information for. :param details: WAMP call details. :return: Proxy connection configuration. """ self.log.debug( '{func}(connection_id={connection_id}, caller_authid="{caller_authid}")', func=hltype(self.get_proxy_connection), connection_id=hlid(connection_id), caller_authid=hlval(details.caller_authid), ) if connection_id in self._connections: connection = self._connections[connection_id] return connection.marshal() else: raise ApplicationError( "crossbar.error.no_such_object", 'no proxy connection with ID "{}" currently running'.format(connection_id), )
@inlineCallbacks @wamp.register(None)
[docs] def start_proxy_connection(self, connection_id, config, details=None): """ Start a new backend connection for the proxy. Called from master node orchestration in :method:`crossbar.master.arealm.arealm.ApplicationRealmMonitor._apply_webcluster_connections`. :param connection_id: :param config: :param details: :return: """ self.log.info( "{func}(connection_id={connection_id}, config=.., caller_authid={caller_authid}):\n{config}", func=hltype(self.start_proxy_connection), connection_id=connection_id, config=pformat(config), caller_authid=hlval(details.caller_authid), ) if connection_id in self._connections: raise ApplicationError( "crossbar.error.already_running", 'proxy connection with ID "{}" already running'.format(connection_id) ) connection = ProxyConnection(self, connection_id, config) self._connections[connection_id] = connection yield connection.start() returnValue(connection.marshal())
@inlineCallbacks @wamp.register(None)
[docs] def stop_proxy_connection(self, connection_id, details=None): """ :param connection_id: :param details: :return: """ self.log.info( '{func}(connection_id={connection_id}, caller_authid="{caller_authid}")', func=hltype(self.stop_proxy_connection), connection_id=connection_id, caller_authid=hlval(details.caller_authid), ) if connection_id not in self._connections: raise ApplicationError( "crossbar.error.no_such_object", 'no proxy connection with ID "{}" currently running'.format(connection_id), ) connection = self._connections[connection_id] yield connection.stop() del self._connections[connection_id] returnValue(connection.marshal())
IRealmContainer.register(ProxyController)