Source code for crossbar.router.dealer

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import random

import txaio
from autobahn import util
from autobahn.exception import PayloadExceededError
from autobahn.wamp import message, role, types
from autobahn.wamp.exception import ApplicationError, ProtocolError
from autobahn.wamp.message import (
    _URI_PAT_LOOSE_EMPTY,
    _URI_PAT_LOOSE_LAST_EMPTY,
    _URI_PAT_LOOSE_NON_EMPTY,
    _URI_PAT_STRICT_EMPTY,
    _URI_PAT_STRICT_LAST_EMPTY,
    _URI_PAT_STRICT_NON_EMPTY,
)
from txaio import make_logger

from crossbar._util import hlflag, hlid, hltype
from crossbar.router import NotAttached, RouterOptions
from crossbar.router.observation import UriObservationMap

__all__ = ("Dealer",)


class InvocationRequest(object):
    """
    Holding information for an individual invocation.
    """

    __slots__ = (
        "id",
        "registration",
        "caller",
        "caller_session_id",
        "call",
        "callee",
        "forward_for",
        "authorization",
        "canceled",
        "error_msg",
        "timeout_call",
    )

    def __init__(self, id, registration, caller, call, callee, forward_for, authorization):
        self.id = id
        self.registration = registration
        self.caller = caller
        self.caller_session_id = caller._session_id
        self.call = call
        self.callee = callee
        self.forward_for = forward_for
        self.authorization = authorization
        self.canceled = False
        self.error_msg = None
        self.timeout_call = None  # if we have a timeout pending, this is it


class RegistrationExtra(object):
    """
    Registration-level extra information held in UriObservationMap.
    """

    __slots__ = ("invoke", "roundrobin_current")

    def __init__(self, invoke=message.Register.INVOKE_SINGLE):
        self.invoke = invoke
        self.roundrobin_current = 0


class RegistrationCalleeExtra(object):
    """
    Callee-level extra information held in UriObservationMap.
    """

    __slots__ = ("concurrency", "concurrency_current")

    def __init__(self, concurrency=None):
        self.concurrency = concurrency
        self.concurrency_current = 0

    def __repr__(self):
        return "{}(concurrency={}, concurrency_current={})".format(
            self.__class__.__name__, self.concurrency, self.concurrency_current
        )


def _can_cancel(session, side="callee"):
    """
    :returns: True if the session supports cancel
    """
    return (
        side in session._session_roles and session._session_roles[side] and session._session_roles[side].call_canceling
    )


[docs] class Dealer(object): """ Basic WAMP dealer. """
[docs] log = make_logger()
def __init__(self, router, reactor, options=None): """ :param router: The router this dealer is part of. :type router: Object that implements :class:`crossbar.router.interfaces.IRouter`. :param options: Router options. :type options: Instance of :class:`crossbar.router.types.RouterOptions`. """
[docs] self._router = router
[docs] self._reactor = reactor
[docs] self._cancel_timers = txaio.make_batched_timer(1) # timeouts have to be integers anyway
[docs] self._options = options or RouterOptions()
# generator for WAMP request IDs
[docs] self._request_id_gen = util.IdGenerator()
# registration map managed by this dealer
[docs] self._registration_map = UriObservationMap(ordered=True)
# map: session -> set of registrations (needed for detach)
[docs] self._session_to_registrations = {}
# map: session -> in-flight invocations
[docs] self._callee_to_invocations = {}
# BEWARE: this map must be kept up-to-date along with the # _invocations map below! Use the helper methods # _add_invoke_request and _remove_invoke_request # map: session -> in-flight invocations
[docs] self._caller_to_invocations = {}
# careful here: the 'request' IDs are unique per-session # (only) so we map from (session_id, call) tuples to in-flight invocations # map: (session_id, call) -> in-flight invocations
[docs] self._invocations_by_call = {}
# pending callee invocation requests
[docs] self._invocations = {}
# check all procedure URIs with strict rules
[docs] self._option_uri_strict = self._options.uri_check == RouterOptions.URI_CHECK_STRICT
# supported features from "WAMP Advanced Profile"
[docs] self._role_features = role.RoleDealerFeatures( caller_identification=True, pattern_based_registration=True, session_meta_api=True, registration_meta_api=True, shared_registration=True, progressive_call_results=True, registration_revocation=True, payload_transparency=True, testament_meta_api=True, payload_encryption_cryptobox=True, call_canceling=True, )
# store for call queues if self._router._store: self._call_store = self._router._store else: self._call_store = None
[docs] def attach(self, session): """ Implements :func:`crossbar.router.interfaces.IDealer.attach` """ if session not in self._session_to_registrations: self._session_to_registrations[session] = set() else: raise Exception("session with ID {} already attached".format(session._session_id))
[docs] def detach(self, session): """ Implements :func:`crossbar.router.interfaces.IDealer.detach` """ # if the caller on an in-flight invocation goes away # INTERRUPT the callee if supported is_rlink_session = session._authrole == "rlink" if session in self._caller_to_invocations: # this needs to update all four places where we track invocations similar to _remove_invoke_request outstanding = self._caller_to_invocations.get(session, []) for invoke in outstanding: # type: InvocationRequest if invoke.canceled: continue if invoke.callee is invoke.caller: # if the calling itself - no need to notify continue callee = invoke.callee if ( "callee" not in callee._session_roles or not callee._session_roles["callee"] or not callee._session_roles["callee"].call_canceling ): self.log.debug( "INTERRUPT not supported on in-flight INVOKE with id={request} on" " session {session} (caller went away)", request=invoke.id, session=session._session_id, ) continue self.log.debug( "INTERRUPTing in-flight INVOKE with id={request} on session {session} (caller went away)", request=invoke.id, session=session._session_id, ) if invoke.timeout_call: invoke.timeout_call.cancel() invoke.timeout_call = None invokes = self._callee_to_invocations[callee] invokes.remove(invoke) if not invokes: del self._callee_to_invocations[callee] del self._invocations[invoke.id] del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)] self._router.send( invoke.callee, message.Interrupt( request=invoke.id, mode=message.Cancel.KILLNOWAIT, ), ) del self._caller_to_invocations[session] if session in self._session_to_registrations: # send out Errors for any in-flight calls we have outstanding = self._callee_to_invocations.get(session, []) for invoke in outstanding: self.log.debug( "Cancelling in-flight INVOKE with id={request} on session {session}", request=invoke.call.request, session=session._session_id, ) reply = message.Error( message.Call.MESSAGE_TYPE, invoke.call.request, ApplicationError.CANCELED, ["callee disconnected from in-flight request"], ) # send this directly to the caller's session # (it is possible the caller was disconnected and thus # _transport is None before we get here though) if invoke.caller._transport: invoke.caller._transport.send(reply) if invoke.timeout_call: invoke.timeout_call.cancel() invoke.timeout_call = None invokes = self._caller_to_invocations[invoke.caller] invokes.remove(invoke) if not invokes: del self._caller_to_invocations[invoke.caller] del self._invocations[invoke.id] del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)] if outstanding: del self._callee_to_invocations[session] for registration in self._session_to_registrations[session]: was_registered, was_last_callee = self._registration_map.drop_observer(session, registration) if was_registered and was_last_callee: self._registration_map.delete_observation(registration) # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # if self._router._realm and self._router._realm.session and not registration.uri.startswith("wamp."): def _publish(registration): service_session = self._router._realm.session # FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file! options = types.PublishOptions(correlation_id=None) if was_registered: service_session.publish( "wamp.registration.on_unregister", session._session_id, registration.id, options=options, ) if was_last_callee: if not is_rlink_session: service_session.publish( "wamp.registration.on_delete", session._session_id, registration.id, options=options, ) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish, registration) del self._session_to_registrations[session] else: raise NotAttached("session with ID {} not attached".format(session._session_id))
[docs] def processRegister(self, session, register): """ Implements :func:`crossbar.router.interfaces.IDealer.processRegister` """ # check topic URI: for SUBSCRIBE, must be valid URI (either strict or loose), and all # URI components must be non-empty other than for wildcard subscriptions # is_rlink_session = session._authrole == "rlink" if self._router.is_traced: if not register.correlation_id: register.correlation_id = self._router.new_correlation_id() register.correlation_is_anchor = True register.correlation_is_last = False if not register.correlation_uri: register.correlation_uri = register.procedure self._router._factory._worker._maybe_trace_rx_msg(session, register) if self._option_uri_strict: if register.match == "wildcard": uri_is_valid = _URI_PAT_STRICT_EMPTY.match(register.procedure) elif register.match == "prefix": uri_is_valid = _URI_PAT_STRICT_LAST_EMPTY.match(register.procedure) elif register.match == "exact": uri_is_valid = _URI_PAT_STRICT_NON_EMPTY.match(register.procedure) else: # should not arrive here raise Exception("logic error") else: if register.match == "wildcard": uri_is_valid = _URI_PAT_LOOSE_EMPTY.match(register.procedure) elif register.match == "prefix": uri_is_valid = _URI_PAT_LOOSE_LAST_EMPTY.match(register.procedure) elif register.match == "exact": uri_is_valid = _URI_PAT_LOOSE_NON_EMPTY.match(register.procedure) else: # should not arrive here raise Exception("logic error") if not uri_is_valid: reply = message.Error( message.Register.MESSAGE_TYPE, register.request, ApplicationError.INVALID_URI, [ "register for invalid procedure URI '{0}' (URI strict checking {1})".format( register.procedure, self._option_uri_strict ) ], ) reply.correlation_id = register.correlation_id reply.correlation_uri = register.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return # disallow registration of procedures starting with "wamp." other than for # trusted sessions (that are sessions built into Crossbar.io routing core) # if session._authrole is not None and session._authrole != "trusted": is_restricted = register.procedure.startswith("wamp.") if is_restricted: reply = message.Error( message.Register.MESSAGE_TYPE, register.request, ApplicationError.INVALID_URI, ["register for restricted procedure URI '{0}')".format(register.procedure)], ) reply.correlation_id = register.correlation_id reply.correlation_uri = register.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return # authorize REGISTER action # d = self._router.authorize(session, register.procedure, "register", options=register.marshal_options()) def on_authorize_success(authorization): # check the authorization before ANYTHING else, otherwise # we may leak information about already-registered URIs etc. if not register.procedure.endswith(".on_log"): self.log.debug( '{func}::on_authorize_success() - permission {result} for REGISTER of procedure "{procedure}" [realm="{realm}", session_id={session_id}, authid="{authid}", authrole="{authrole}"]', func=hltype(self.processRegister), result=hlflag(authorization["allow"], "GRANTED", "DENIED"), procedure=hlid(register.procedure), realm=hlid(session._realm), session_id=hlid(session._session_id), authid=hlid(session._authid), authrole=hlid(session._authrole), ) if not authorization["allow"]: # error reply since session is not authorized to register # reply = message.Error( message.Register.MESSAGE_TYPE, register.request, ApplicationError.NOT_AUTHORIZED, [ 'session (session_id={}, authid="{}", authrole="{}") is not authorized to register procedure "{}" on realm "{}"'.format( session._session_id, session._authid, session._authrole, register.procedure, session._realm ) ], ) # get existing registration for procedure / matching strategy - if any # registration = self._registration_map.get_observation(register.procedure, register.match) # if the session disconnected while the authorization was # being checked, stop if session not in self._session_to_registrations: # if the session *really* disconnected, it won't have # a _session_id any longer, so we double-check if session._session_id is not None: self.log.error( "Session '{session_id}' still appears valid, but isn't in registration map", session_id=session._session_id, ) self.log.info( "Session vanished while registering '{procedure}'", procedure=register.procedure, ) assert registration is None return # if force_reregister was enabled, we only do any actual # kicking of existing registrations *after* authorization if registration and not register.force_reregister: # there is an existing registration, and that has an # invocation strategy that only allows a single callee # on a the given registration # if registration.extra.invoke == message.Register.INVOKE_SINGLE: reply = message.Error( message.Register.MESSAGE_TYPE, register.request, ApplicationError.PROCEDURE_ALREADY_EXISTS, ["register for already registered procedure '{0}'".format(register.procedure)], ) reply.correlation_id = register.correlation_id reply.correlation_uri = register.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return # there is an existing registration, and that has an # invokation strategy different from the one requested # by the new callee # if registration.extra.invoke != register.invoke: reply = message.Error( message.Register.MESSAGE_TYPE, register.request, ApplicationError.PROCEDURE_EXISTS_INVOCATION_POLICY_CONFLICT, [ "register for already registered procedure '{0}' " "with conflicting invocation policy (has {1} and " "{2} was requested)".format(register.procedure, registration.extra.invoke, register.invoke) ], ) reply.correlation_id = register.correlation_id reply.correlation_uri = register.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return # this check is a little redundant, because theoretically # we already returned (above) if this was False, but for safety... if authorization["allow"]: registration = self._registration_map.get_observation(register.procedure, register.match) if register.force_reregister and registration: for obs in registration.observers: self._registration_map.drop_observer(obs, registration) kicked = message.Unregistered( 0, registration=registration.id, reason="wamp.error.unregistered", ) kicked.correlation_id = register.correlation_id kicked.correlation_uri = register.procedure kicked.correlation_is_anchor = False kicked.correlation_is_last = False self._router.send(obs, kicked) self._registration_map.delete_observation(registration) # ok, session authorized to register. now get the registration # registration_extra = RegistrationExtra(register.invoke) registration_callee_extra = RegistrationCalleeExtra(register.concurrency) registration, was_already_registered, is_first_callee = self._registration_map.add_observer( session, register.procedure, register.match, registration_extra, registration_callee_extra ) if not was_already_registered: self._session_to_registrations[session].add(registration) # acknowledge register with registration ID # reply = message.Registered(register.request, registration.id) reply.correlation_id = register.correlation_id reply.correlation_uri = register.procedure reply.correlation_is_anchor = False # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # if ( self._router._realm and self._router._realm.session and not registration.uri.startswith("wamp.") and (is_first_callee or not was_already_registered) ): reply.correlation_is_last = False # when this message was forwarded from other nodes, exclude all such nodes # from receiving the meta event we'll publish below by authid (of the r2r link # from the forwarding node connected to this router node) exclude_authid = None if register.forward_for: exclude_authid = [ff["authid"] for ff in register.forward_for] self.log.info( "WAMP meta event will be published excluding these authids (from forward_for): {exclude_authid}", exclude_authid=exclude_authid, ) def _publish(): service_session = self._router._realm.session if exclude_authid or self._router.is_traced: options = types.PublishOptions( correlation_id=register.correlation_id, correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, ) else: options = None if is_first_callee: registration_details = { "id": registration.id, "created": registration.created, "uri": registration.uri, "match": registration.match, "invoke": registration.extra.invoke, } if not is_rlink_session: service_session.publish( "wamp.registration.on_create", session._session_id, registration_details, options=options, ) if not was_already_registered: if options: options.correlation_is_last = True if not is_rlink_session: service_session.publish( "wamp.registration.on_register", session._session_id, registration.id, options=options, ) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish) else: reply.correlation_is_last = True # send out reply to register requestor # self._router.send(session, reply) def on_authorize_error(err): """ the call to authorize the action _itself_ failed (note this is different from the call to authorize succeed, but the authorization being denied) """ self.log.failure("Authorization of 'register' for '{uri}' failed", uri=register.procedure, failure=err) reply = message.Error( message.Register.MESSAGE_TYPE, register.request, ApplicationError.AUTHORIZATION_FAILED, [ "failed to authorize session for registering procedure '{0}': {1}".format( register.procedure, err.value ) ], ) reply.correlation_id = register.correlation_id reply.correlation_uri = register.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) txaio.add_callbacks(d, on_authorize_success, on_authorize_error)
[docs] def processUnregister(self, session, unregister): """ Implements :func:`crossbar.router.interfaces.IDealer.processUnregister` """ if self._router.is_traced: if not unregister.correlation_id: unregister.correlation_id = self._router.new_correlation_id() unregister.correlation_is_anchor = True unregister.correlation_is_last = False # get registration by registration ID or None (if it doesn't exist on this broker) # registration = self._registration_map.get_observation_by_id(unregister.registration) if registration: if self._router.is_traced and not unregister.correlation_uri: unregister.correlation_uri = registration.uri if session in registration.observers: was_registered, was_last_callee, has_follow_up_messages = self._unregister( registration, session, unregister ) reply = message.Unregistered(unregister.request) if self._router.is_traced: reply.correlation_uri = registration.uri reply.correlation_is_last = not has_follow_up_messages else: # registration exists on this dealer, but the session that wanted to unregister wasn't registered # reply = message.Error( message.Unregister.MESSAGE_TYPE, unregister.request, ApplicationError.NO_SUCH_REGISTRATION ) if self._router.is_traced: reply.correlation_uri = reply.error reply.correlation_is_last = True else: # registration doesn't even exist on this broker # reply = message.Error( message.Unregister.MESSAGE_TYPE, unregister.request, ApplicationError.NO_SUCH_REGISTRATION ) if self._router.is_traced: reply.correlation_uri = reply.error reply.correlation_is_last = True if self._router.is_traced: self._router._factory._worker._maybe_trace_rx_msg(session, unregister) reply.correlation_id = unregister.correlation_id reply.correlation_is_anchor = False self._router.send(session, reply)
[docs] def _unregister(self, registration, session, unregister=None): # drop session from registration observers # was_registered, was_last_callee = self._registration_map.drop_observer(session, registration) was_deleted = False is_rlink_session = session._authrole == "rlink" if was_registered and was_last_callee: self._registration_map.delete_observation(registration) was_deleted = True # remove registration from session->registrations map # if was_registered: self._session_to_registrations[session].discard(registration) # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # if ( self._router._realm and self._router._realm.session and not registration.uri.startswith("wamp.") and (was_registered or was_deleted) ): has_follow_up_messages = True # when this message was forwarded from other nodes, exclude all such nodes # from receiving the meta event we'll publish below by authid (of the r2r link # from the forwarding node connected to this router node) exclude_authid = None if unregister.forward_for: exclude_authid = [ff["authid"] for ff in unregister.forward_for] self.log.info( "WAMP meta event will be published excluding these authids (from forward_for): {exclude_authid}", exclude_authid=exclude_authid, ) def _publish(): service_session = self._router._realm.session if unregister and (exclude_authid and self._router.is_traced): options = types.PublishOptions( correlation_id=unregister.correlation_id, correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, ) else: options = None if was_registered: service_session.publish( "wamp.registration.on_unregister", session._session_id, registration.id, options=options ) if was_deleted: if options: options.correlation_is_last = True if not is_rlink_session: service_session.publish( "wamp.registration.on_delete", session._session_id, registration.id, options=options ) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish) else: has_follow_up_messages = False return was_registered, was_last_callee, has_follow_up_messages
[docs] def removeCallee(self, registration, session, reason=None): """ Actively unregister a callee session from a registration. """ was_registered, was_last_callee, _ = self._unregister(registration, session) # actively inform the callee that it has been unregistered # if ( "callee" in session._session_roles and session._session_roles["callee"] and session._session_roles["callee"].registration_revocation ): reply = message.Unregistered(0, registration=registration.id, reason=reason) reply.correlation_uri = registration.uri self._router.send(session, reply) return was_registered, was_last_callee
[docs] def processCall(self, session, call: message.Call): """ Implements :func:`crossbar.router.interfaces.IDealer.processCall` """ if self._router.is_traced: if not call.correlation_id: call.correlation_id = self._router.new_correlation_id() call.correlation_is_anchor = True call.correlation_is_last = False if not call.correlation_uri: call.correlation_uri = call.procedure self._router._factory._worker._maybe_trace_rx_msg(session, call) # check procedure URI: for CALL, must be valid URI (either strict or loose), and # all URI components must be non-empty if self._option_uri_strict: uri_is_valid = _URI_PAT_STRICT_NON_EMPTY.match(call.procedure) else: uri_is_valid = _URI_PAT_LOOSE_NON_EMPTY.match(call.procedure) if not uri_is_valid: reply = message.Error( message.Call.MESSAGE_TYPE, call.request, ApplicationError.INVALID_URI, [ "call with invalid procedure URI '{0}' (URI strict checking {1})".format( call.procedure, self._option_uri_strict ) ], ) reply.correlation_id = call.correlation_id reply.correlation_uri = call.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return # authorize CALL action: # - returns an authorization dict # - might use static (aka "permissions") or dynamic authorizers # - might be cached in the router # - might include a payload validate stanza # - will be store (later) in InvocationRequest.authorization d = self._router.authorize(session, call.procedure, "call", options=call.marshal_options()) def on_authorize_success(authorization): # authorization = { # 'allow': True, # 'disclose': False, # 'validate': { # 'catalog': 'pydefi.eth', # 'args': None, # 'kwargs': None, # 'results': ['Address'], # 'kwresults': None, # }, # 'meta': { # 'args': None, # 'kwargs': { # 'clock_oid': _clock_oid # } # }, # 'cache': True, # } # the call to authorize the action _itself_ succeeded. now go on depending on whether # the action was actually authorized or not if not call.procedure.endswith(".on_log"): self.log.debug( '{func}::on_authorize_success() - permission {result} for CALL of procedure "{procedure}" [realm="{realm}", session_id={session_id}, authid="{authid}", authrole="{authrole}"]', func=hltype(self.processCall), result=hlflag(authorization["allow"], "GRANTED", "DENIED"), procedure=hlid(call.procedure), realm=hlid(session._realm), session_id=hlid(session._session_id), authid=hlid(session._authid), authrole=hlid(session._authrole), ) if not authorization["allow"]: reply = message.Error( message.Call.MESSAGE_TYPE, call.request, ApplicationError.NOT_AUTHORIZED, [ 'session (session_id={}, authid="{}", authrole="{}") is not authorized to ' 'call procedure "{}" on realm "{}"'.format( session._session_id, session._authid, session._authrole, call.procedure, session._realm ) ], ) reply.correlation_id = call.correlation_id reply.correlation_uri = call.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) else: # validate payload (skip in "payload_transparency" mode) if call.payload is None: try: self._router.validate( "call", call.procedure, call.args, call.kwargs, validate=authorization.get("validate", None), ) except Exception as e: reply = message.Error( message.Call.MESSAGE_TYPE, call.request, ApplicationError.INVALID_ARGUMENT, [ "call of procedure '{0}' with invalid application payload: {1}".format( call.procedure, e ) ], ) reply.correlation_id = call.correlation_id reply.correlation_uri = call.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return # get registrations active on the procedure called registration = self._registration_map.best_matching_observation(call.procedure) # if the session disconnected while the authorization was being checked, # 'registration' will be None, and we'll (correctly) fire an error. if registration: # now actually perform the invocation of the callee self._call(session, call, registration, authorization) else: reply = message.Error( message.Call.MESSAGE_TYPE, call.request, ApplicationError.NO_SUCH_PROCEDURE, ["no callee registered for procedure <{0}>".format(call.procedure)], ) reply.correlation_id = call.correlation_id reply.correlation_uri = call.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) def on_authorize_error(err): """ the call to authorize the action _itself_ failed (note this is different from the call to authorize succeed, but the authorization being denied) """ self.log.failure("Authorization of 'call' for '{uri}' failed", uri=call.procedure, failure=err) reply = message.Error( message.Call.MESSAGE_TYPE, call.request, ApplicationError.AUTHORIZATION_FAILED, ["failed to authorize session for calling procedure '{0}': {1}".format(call.procedure, err.value)], ) reply.correlation_id = call.correlation_id reply.correlation_uri = call.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) txaio.add_callbacks(d, on_authorize_success, on_authorize_error)
[docs] def _call(self, session, call, registration, authorization, is_queued_call=False): # will hold the callee (the concrete endpoint) that we will forward the call to .. # callee = None callee_extra = None # determine callee according to invocation policy # if registration.extra.invoke in [ message.Register.INVOKE_SINGLE, message.Register.INVOKE_FIRST, message.Register.INVOKE_LAST, ]: # a single endpoint is considered for forwarding the call .. if registration.extra.invoke == message.Register.INVOKE_SINGLE: callee = registration.observers[0] elif registration.extra.invoke == message.Register.INVOKE_FIRST: callee = registration.observers[0] elif registration.extra.invoke == message.Register.INVOKE_LAST: callee = registration.observers[len(registration.observers) - 1] else: # should not arrive here raise Exception("logic error") # check maximum concurrency of the (single) endpoint callee_extra = registration.observers_extra.get(callee, None) if callee_extra: if callee_extra.concurrency and callee_extra.concurrency_current >= callee_extra.concurrency: if is_queued_call or ( self._call_store and self._call_store.maybe_queue_call(session, call, registration, authorization) ): return False else: reply = message.Error( message.Call.MESSAGE_TYPE, call.request, "crossbar.error.max_concurrency_reached", [ "maximum concurrency {} of callee/endpoint reached (on " "non-shared/single registration)".format(callee_extra.concurrency) ], ) reply.correlation_id = call.correlation_id reply.correlation_uri = call.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return False else: callee_extra.concurrency_current += 1 elif registration.extra.invoke == message.Register.INVOKE_ROUNDROBIN: # remember where we started to search for a suitable callee/endpoint in the # round-robin list of callee endpoints roundrobin_start_index = registration.extra.roundrobin_current % len(registration.observers) # now search fo a suitable callee/endpoint while True: callee = registration.observers[registration.extra.roundrobin_current % len(registration.observers)] callee_extra = registration.observers_extra.get(callee, None) registration.extra.roundrobin_current += 1 if callee_extra and callee_extra.concurrency: if callee_extra.concurrency_current >= callee_extra.concurrency: # this callee has set a maximum concurrency that has already been reached. # we need to search further, but only if we haven't reached the beginning # of our round-robin list if ( registration.extra.roundrobin_current % len(registration.observers) == roundrobin_start_index ): # we've looked through the whole round-robin list, and didn't find a suitable # callee (one that hasn't it's maximum concurrency already reached). if is_queued_call or ( self._call_store and self._call_store.maybe_queue_call(session, call, registration, authorization) ): return False else: reply = message.Error( message.Call.MESSAGE_TYPE, call.request, "crossbar.error.max_concurrency_reached", [ "maximum concurrency {} of all callee/endpoints reached (on " "round-robin registration)".format(callee_extra.concurrency) ], ) reply.correlation_id = call.correlation_id reply.correlation_uri = call.procedure reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return False else: # .. search on .. pass else: # ok, we've found a callee that has set a maximum concurrency, but where the # maximum has not yet been reached break else: # ok, we've found a callee which hasn't set a maximum concurrency, and hence is always # eligible for having a call forwarded to break if callee_extra: callee_extra.concurrency_current += 1 elif registration.extra.invoke == message.Register.INVOKE_RANDOM: # FIXME: implement max. concurrency and call queueing callee = registration.observers[random.randint(0, len(registration.observers) - 1)] else: # should not arrive here raise Exception("logic error") # new ID for the invocation # invocation_request_id = self._request_id_gen.next() # caller disclosure # if authorization.get("disclose", False): disclose = True elif call.procedure.startswith("wamp.") or call.procedure.startswith("crossbar."): disclose = True else: disclose = False # set the disclosed caller and forward_for # forward_for = None if disclose: if call.forward_for: # forwarded call: ultimate caller is the first in forward_for caller = call.forward_for[0]["session"] caller_authid = call.forward_for[0]["authid"] caller_authrole = call.forward_for[0]["authrole"] # append this session (a r2r link) to forward_for assert session._session_id is not None forward_for = call.forward_for + [ { "session": session._session_id, "authid": session._authid, "authrole": session._authrole, } ] else: # non-forwarded call: ultimate caller is this session caller = session._session_id caller_authid = session._authid caller_authrole = session._authrole else: # caller session isn't disclosed to the callee caller = None caller_authid = None caller_authrole = None # for pattern-based registrations, the INVOCATION must contain # the actual procedure being called # if registration.match != message.Register.MATCH_EXACT: procedure = call.procedure else: procedure = None if call.payload: invocation = message.Invocation( invocation_request_id, registration.id, payload=call.payload, timeout=call.timeout, receive_progress=call.receive_progress, caller=caller, caller_authid=caller_authid, caller_authrole=caller_authrole, procedure=procedure, transaction_hash=call.transaction_hash, enc_algo=call.enc_algo, enc_key=call.enc_key, enc_serializer=call.enc_serializer, forward_for=forward_for, ) else: invocation = message.Invocation( invocation_request_id, registration.id, args=call.args, kwargs=call.kwargs, timeout=call.timeout, receive_progress=call.receive_progress, caller=caller, caller_authid=caller_authid, caller_authrole=caller_authrole, procedure=procedure, transaction_hash=call.transaction_hash, forward_for=forward_for, ) invocation.correlation_id = call.correlation_id invocation.correlation_uri = call.procedure invocation.correlation_is_anchor = False invocation.correlation_is_last = False # for internal use by rlinks invocation._router_internal = ( session._session_id, # caller session._authid, session._authrole, ) self._add_invoke_request( invocation_request_id, registration, session, call, callee, forward_for, authorization, timeout=call.timeout, ) self._router.send(callee, invocation) return True
[docs] def _add_invoke_request( self, invocation_request_id, registration, session, call, callee, forward_for, authorization, timeout=None ): """ Internal helper. Adds an InvocationRequest to both the _callee_to_invocations and _invocations maps. """ invoke_request = InvocationRequest( invocation_request_id, registration, session, call, callee, forward_for, authorization ) self._invocations[invocation_request_id] = invoke_request self._invocations_by_call[session._session_id, call.request] = invoke_request invokes = self._callee_to_invocations.get(callee, []) invokes.append(invoke_request) self._callee_to_invocations[callee] = invokes # map to keep track of the invocations by each caller invokes = self._caller_to_invocations.get(session, []) invokes.append(invoke_request) self._caller_to_invocations[session] = invokes # deal with possible timeouts # NB: timeouts can only be integers (check spec, but this is # what Autobahn code says) so we can just have a bucket-based # thing (and probably should?) instead of "straight" callLater if timeout: def _cancel_both_sides(): """ The timeout was reacted; send an ERROR to the caller and INTERRUPT to the callee """ if _can_cancel(invoke_request.caller, "caller"): self._router.send( invoke_request.caller, message.Error( message.Call.MESSAGE_TYPE, call.request, ApplicationError.CANCELED, ["timeout reached"], ), ) if _can_cancel(invoke_request.callee, "callee"): self._router.send( invoke_request.callee, message.Interrupt( invoke_request.id, message.Cancel.KILLNOWAIT, # or KILL ? ), ) self._remove_invoke_request(invoke_request) invoke_request.timeout_call = self._cancel_timers.call_later(timeout, _cancel_both_sides) return invoke_request
[docs] def _remove_invoke_request(self, invocation_request): """ Internal helper. Removes an InvocationRequest from both the _callee_to_invocations and _invocations maps. """ if invocation_request.timeout_call: invocation_request.timeout_call.cancel() invocation_request.timeout_call = None # all four places should always be updated together if invocation_request.id in self._invocations: del self._invocations[invocation_request.id] invokes = self._callee_to_invocations[invocation_request.callee] invokes.remove(invocation_request) if not invokes: del self._callee_to_invocations[invocation_request.callee] invokes = self._caller_to_invocations[invocation_request.caller] invokes.remove(invocation_request) if not invokes: del self._caller_to_invocations[invocation_request.caller] del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request]
# noinspection PyUnusedLocal
[docs] def processCancel(self, session, cancel): """ Implements :func:`crossbar.router.interfaces.IDealer.processCancel` A 'caller' has sent us a message wishing to cancel a still- outstanding call. They can request 1 of 3 modes of cancellation. """ if (session._session_id, cancel.request) in self._invocations_by_call: invocation_request = self._invocations_by_call[session._session_id, cancel.request] if self._router.is_traced: # correlate the cancel request to the original call cancel.correlation_id = invocation_request.call.correlation_id cancel.correlation_uri = invocation_request.call.procedure cancel.correlation_is_anchor = False cancel.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, cancel) # for those that repeatedly push elevator buttons if invocation_request.canceled: return invocation_request.canceled = True # "skip" or "kill" or "killnowait" (see WAMP section.14.3.4) cancellation_mode = cancel.mode if not _can_cancel(invocation_request.callee, "callee"): # callee can't deal with an "Interrupt" cancellation_mode = message.Cancel.SKIP # on the incoming Cancel, we send Interrupt to "callee" # and Error to "caller", with a couple wrinkles based on # the mode: # - if "skip": don't send Interrupt to 'callee' # - if "kill": don't send Error until incoming Error from 'callee' # - if "killnowait": send Interrupt + Error immediately # # Note that if we've sent nothing to the callee, we must # simply ignore any "error" or "result" we get back from # them. Similarly if we send Error immediately (where we # must ignore Error if/when the callee sends it). # (XXX need test for ^ case) if cancellation_mode != message.Cancel.SKIP: # FIXME disclose = True forward_for = None if disclose: if cancel.forward_for: # append this calling session (a r2r link) to forward_for assert session._session_id is not None forward_for = cancel.forward_for + [ { "session": session._session_id, "authid": session._authid, "authrole": session._authrole, } ] interrupt_mode = cancellation_mode # "kill" or "killnowait" interrupt = message.Interrupt(invocation_request.id, interrupt_mode, forward_for=forward_for) if self._router.is_traced: interrupt.correlation_id = invocation_request.call.correlation_id interrupt.correlation_uri = invocation_request.call.procedure interrupt.correlation_is_anchor = False interrupt.correlation_is_last = False self._router.send(invocation_request.callee, interrupt) # we also need to send an "Error" over to the other # side -- *when* we do that depends on the mode. error_msg = message.Error( message.Call.MESSAGE_TYPE, cancel.request, ApplicationError.CANCELED, ) if cancellation_mode == message.Cancel.KILL: # not SKIP, not KILLNOWAIT # send the message only after we get Error back from callee invocation_request.error_msg = error_msg else: # send the message immediately (have to make sure # we'll ignore any Error or Result from callee) self._router.send(session, error_msg) return
[docs] def processYield(self, session, yield_): """ Implements :func:`crossbar.router.interfaces.IDealer.processYield` """ # assert(session in self._session_to_registrations) if yield_.request in self._invocations: # get the invocation request tracked for the caller # invocation_request = self._invocations[yield_.request] if self._router.is_traced: # correlate the received yield return to the original call yield_.correlation_id = invocation_request.call.correlation_id yield_.correlation_uri = invocation_request.call.correlation_uri yield_.correlation_is_anchor = False yield_.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, yield_) # check to make sure this session is the one that is supposed to be yielding if invocation_request.callee is not session: raise ProtocolError( "Dealer.onYield(): YIELD received for non-owned request ID {0}".format(yield_.request) ) reply = None # FIXME disclose = True # set the disclosed callee and forward_for # forward_for = None if disclose: if yield_.forward_for: # forwarded call result: ultimate callee is the first in forward_for callee = yield_.forward_for[0]["session"] callee_authid = yield_.forward_for[0]["authid"] callee_authrole = yield_.forward_for[0]["authrole"] # append this session (a r2r link) to forward_for assert session._session_id is not None forward_for = yield_.forward_for + [ { "session": session._session_id, "authid": session._authid, "authrole": session._authrole, } ] else: # non-forwarded call result: ultimate callee is this session callee = session._session_id callee_authid = session._authid callee_authrole = session._authrole else: # callee session isn't disclosed to the caller callee = None callee_authid = None callee_authrole = None # we've maybe cancelled this invocation if invocation_request.canceled: # see if we're waiting to send the error back to the # other side (do we want to do this ONLY on Error # coming back? Logically, if we've sent a Cancel and # the callee supports cancelling, they shouldn't send # us a Yield -- but there's a race-condition here if # we're sending the Cancel as they're sending the # Yield) if invocation_request.error_msg: reply = invocation_request.error_msg invocation_request.error_msg = None # we're now done; we've gotten some response from the # callee -- if the type was "skip", we'll never get an # Error back (NB: if we hit the race condition above, # we *will* get an Error back "soon" but NOT if the # cancel mode was "skip") call_complete = True else: # not canceled is_valid = True if yield_.payload is None: # validate normal args/kwargs payload try: self._router.validate( "call_result", invocation_request.call.procedure, yield_.args, yield_.kwargs, validate=invocation_request.authorization.get("validate", None), ) except Exception as e: is_valid = False reply = message.Error( message.Call.MESSAGE_TYPE, invocation_request.call.request, ApplicationError.INVALID_ARGUMENT, [ "call result from procedure '{0}' with invalid application payload: {1}".format( invocation_request.call.procedure, e ) ], callee=callee, callee_authid=callee_authid, callee_authrole=callee_authrole, forward_for=forward_for, ) else: reply = message.Result( invocation_request.call.request, args=yield_.args, kwargs=yield_.kwargs, progress=yield_.progress, callee=callee, callee_authid=callee_authid, callee_authrole=callee_authrole, forward_for=forward_for, ) else: reply = message.Result( invocation_request.call.request, payload=yield_.payload, progress=yield_.progress, enc_algo=yield_.enc_algo, enc_key=yield_.enc_key, enc_serializer=yield_.enc_serializer, callee=callee, callee_authid=callee_authid, callee_authrole=callee_authrole, forward_for=forward_for, ) # the call is done if it's a regular call (non-progressive) or if the payload was invalid # if not yield_.progress or not is_valid: call_complete = True else: call_complete = False # the calling session might have been lost in the meantime .. # if invocation_request.caller._transport and reply: if self._router.is_traced: reply.correlation_id = invocation_request.call.correlation_id reply.correlation_uri = invocation_request.call.correlation_uri reply.correlation_is_anchor = False reply.correlation_is_last = call_complete try: self._router.send(invocation_request.caller, reply) except PayloadExceededError as e: # cannot send result to original caller as the result size surpasses the # transport limit (the maximum message size the original calling client is # willing to receive) call_complete = True reply = message.Error( message.Call.MESSAGE_TYPE, invocation_request.call.request, ApplicationError.INVALID_ARGUMENT, [ "call result from procedure '{0}' with invalid application payload: {1}".format( invocation_request.call.procedure, e ) ], callee=callee, callee_authid=callee_authid, callee_authrole=callee_authrole, forward_for=forward_for, ) reply.correlation_id = invocation_request.call.correlation_id reply.correlation_uri = invocation_request.call.correlation_uri reply.correlation_is_anchor = False reply.correlation_is_last = call_complete self._router.send(invocation_request.caller, reply) if call_complete: # reduce current concurrency on callee callee_extra = invocation_request.registration.observers_extra.get(session, None) if callee_extra: callee_extra.concurrency_current -= 1 # cleanup the (individual) invocation self._remove_invoke_request(invocation_request) # check for any calls queued on the registration for which an # invocation just returned, and hence there is likely concurrency # free again to actually forward calls previously queued calls # that were queued because no callee endpoint concurrency was free if self._call_store: queued_call = self._call_store.get_queued_call(invocation_request.registration) if queued_call: invocation_sent = self._call( queued_call.session, queued_call.call, queued_call.registration, queued_call.authorization, True, ) # only actually pop the queued call when we really were # able to forward the call now if invocation_sent: self._call_store.pop_queued_call(invocation_request.registration) else: self.log.debug( "Dealer.onYield(): YIELD received for non-pending request ID {request_id}", request_id=yield_.request )
[docs] def processInvocationError(self, session, error): """ Implements :func:`crossbar.router.interfaces.IDealer.processInvocationError` """ # assert(session in self._session_to_registrations) if error.request in self._invocations: # get the invocation request tracked for the caller invocation_request = self._invocations[error.request] if self._router.is_traced: # correlate the received invocation error to the original call error.correlation_id = invocation_request.call.correlation_id error.correlation_uri = invocation_request.call.correlation_uri error.correlation_is_anchor = False error.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, error) # if concurrency is enabled on this, an error counts as # "an answer" so we decrement. callee_extra = invocation_request.registration.observers_extra.get(session, None) if callee_extra: callee_extra.concurrency_current -= 1 reply = None # FIXME disclose = False # set the disclosed callee and forward_for # forward_for = None if disclose: if error.forward_for: # forwarded call: ultimate caller is the first in forward_for callee = invocation_request.forward_for[0]["session"] callee_authid = invocation_request.forward_for[0]["authid"] callee_authrole = invocation_request.forward_for[0]["authrole"] # append this session (a r2r link) to forward_for assert session._session_id is not None forward_for = error.forward_for + [ { "session": session._session_id, "authid": session._authid, "authrole": session._authrole, } ] else: # non-forwarded call result: ultimate callee is this session callee = session._session_id callee_authid = session._authid callee_authrole = session._authrole else: # callee session isn't disclosed to the caller callee = None callee_authid = None callee_authrole = None # we've maybe cancelled this invocation if invocation_request.canceled: # see if we're waiting to send the error back to the # other side (see note about race-condition in Yield) if invocation_request.error_msg: reply = invocation_request.error_msg invocation_request.error_msg = None else: if error.payload is None: # validate normal args/kwargs payload try: self._router.validate( "call_error", invocation_request.call.procedure, error.args, error.kwargs, validate=invocation_request.authorization.get("validate", None), ) except Exception as e: reply = message.Error( message.Call.MESSAGE_TYPE, invocation_request.call.request, ApplicationError.INVALID_ARGUMENT, [ "call error from procedure '{0}' with invalid application payload: {1}".format( invocation_request.call.procedure, e ) ], callee=callee, callee_authid=callee_authid, callee_authrole=callee_authrole, forward_for=forward_for, ) else: reply = message.Error( message.Call.MESSAGE_TYPE, invocation_request.call.request, error.error, args=error.args, kwargs=error.kwargs, callee=callee, callee_authid=callee_authid, callee_authrole=callee_authrole, forward_for=forward_for, ) else: reply = message.Error( message.Call.MESSAGE_TYPE, invocation_request.call.request, error.error, payload=error.payload, enc_algo=error.enc_algo, enc_key=error.enc_key, enc_serializer=error.enc_serializer, callee=callee, callee_authid=callee_authid, callee_authrole=callee_authrole, forward_for=forward_for, ) # the calling session might have been lost in the meantime .. # if invocation_request.caller._transport and reply: if self._router.is_traced: reply.correlation_id = invocation_request.call.correlation_id reply.correlation_uri = invocation_request.call.correlation_uri reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(invocation_request.caller, reply) # the call is done # invoke = self._invocations[error.request] self._remove_invoke_request(invoke) else: self.log.debug( "Dealer.onInvocationError(): ERROR received for non-pending request_type {request_type}" " and request ID {request_id}", request_type=error.request_type, request_id=error.request, )