#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import random
import txaio
from autobahn import util
from autobahn.exception import PayloadExceededError
from autobahn.wamp import message, role, types
from autobahn.wamp.exception import ApplicationError, ProtocolError
from autobahn.wamp.message import (
_URI_PAT_LOOSE_EMPTY,
_URI_PAT_LOOSE_LAST_EMPTY,
_URI_PAT_LOOSE_NON_EMPTY,
_URI_PAT_STRICT_EMPTY,
_URI_PAT_STRICT_LAST_EMPTY,
_URI_PAT_STRICT_NON_EMPTY,
)
from txaio import make_logger
from crossbar._util import hlflag, hlid, hltype
from crossbar.router import NotAttached, RouterOptions
from crossbar.router.observation import UriObservationMap
__all__ = ("Dealer",)
class InvocationRequest(object):
"""
Holding information for an individual invocation.
"""
__slots__ = (
"id",
"registration",
"caller",
"caller_session_id",
"call",
"callee",
"forward_for",
"authorization",
"canceled",
"error_msg",
"timeout_call",
)
def __init__(self, id, registration, caller, call, callee, forward_for, authorization):
self.id = id
self.registration = registration
self.caller = caller
self.caller_session_id = caller._session_id
self.call = call
self.callee = callee
self.forward_for = forward_for
self.authorization = authorization
self.canceled = False
self.error_msg = None
self.timeout_call = None # if we have a timeout pending, this is it
class RegistrationExtra(object):
"""
Registration-level extra information held in UriObservationMap.
"""
__slots__ = ("invoke", "roundrobin_current")
def __init__(self, invoke=message.Register.INVOKE_SINGLE):
self.invoke = invoke
self.roundrobin_current = 0
class RegistrationCalleeExtra(object):
"""
Callee-level extra information held in UriObservationMap.
"""
__slots__ = ("concurrency", "concurrency_current")
def __init__(self, concurrency=None):
self.concurrency = concurrency
self.concurrency_current = 0
def __repr__(self):
return "{}(concurrency={}, concurrency_current={})".format(
self.__class__.__name__, self.concurrency, self.concurrency_current
)
def _can_cancel(session, side="callee"):
"""
:returns: True if the session supports cancel
"""
return (
side in session._session_roles and session._session_roles[side] and session._session_roles[side].call_canceling
)
[docs]
class Dealer(object):
"""
Basic WAMP dealer.
"""
def __init__(self, router, reactor, options=None):
"""
:param router: The router this dealer is part of.
:type router: Object that implements :class:`crossbar.router.interfaces.IRouter`.
:param options: Router options.
:type options: Instance of :class:`crossbar.router.types.RouterOptions`.
"""
[docs]
self._reactor = reactor
[docs]
self._cancel_timers = txaio.make_batched_timer(1) # timeouts have to be integers anyway
[docs]
self._options = options or RouterOptions()
# generator for WAMP request IDs
[docs]
self._request_id_gen = util.IdGenerator()
# registration map managed by this dealer
[docs]
self._registration_map = UriObservationMap(ordered=True)
# map: session -> set of registrations (needed for detach)
[docs]
self._session_to_registrations = {}
# map: session -> in-flight invocations
[docs]
self._callee_to_invocations = {}
# BEWARE: this map must be kept up-to-date along with the
# _invocations map below! Use the helper methods
# _add_invoke_request and _remove_invoke_request
# map: session -> in-flight invocations
[docs]
self._caller_to_invocations = {}
# careful here: the 'request' IDs are unique per-session
# (only) so we map from (session_id, call) tuples to in-flight invocations
# map: (session_id, call) -> in-flight invocations
[docs]
self._invocations_by_call = {}
# pending callee invocation requests
# check all procedure URIs with strict rules
[docs]
self._option_uri_strict = self._options.uri_check == RouterOptions.URI_CHECK_STRICT
# supported features from "WAMP Advanced Profile"
[docs]
self._role_features = role.RoleDealerFeatures(
caller_identification=True,
pattern_based_registration=True,
session_meta_api=True,
registration_meta_api=True,
shared_registration=True,
progressive_call_results=True,
registration_revocation=True,
payload_transparency=True,
testament_meta_api=True,
payload_encryption_cryptobox=True,
call_canceling=True,
)
# store for call queues
if self._router._store:
self._call_store = self._router._store
else:
self._call_store = None
[docs]
def attach(self, session):
"""
Implements :func:`crossbar.router.interfaces.IDealer.attach`
"""
if session not in self._session_to_registrations:
self._session_to_registrations[session] = set()
else:
raise Exception("session with ID {} already attached".format(session._session_id))
[docs]
def detach(self, session):
"""
Implements :func:`crossbar.router.interfaces.IDealer.detach`
"""
# if the caller on an in-flight invocation goes away
# INTERRUPT the callee if supported
is_rlink_session = session._authrole == "rlink"
if session in self._caller_to_invocations:
# this needs to update all four places where we track invocations similar to _remove_invoke_request
outstanding = self._caller_to_invocations.get(session, [])
for invoke in outstanding: # type: InvocationRequest
if invoke.canceled:
continue
if invoke.callee is invoke.caller: # if the calling itself - no need to notify
continue
callee = invoke.callee
# Always clean up the invocation tracking regardless of call_canceling support
if invoke.timeout_call:
invoke.timeout_call.cancel()
invoke.timeout_call = None
invokes = self._callee_to_invocations[callee]
invokes.remove(invoke)
if not invokes:
del self._callee_to_invocations[callee]
del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]
# Only send INTERRUPT if the callee supports call canceling
if (
"callee" in callee._session_roles
and callee._session_roles["callee"]
and callee._session_roles["callee"].call_canceling
):
self.log.debug(
"INTERRUPTing in-flight INVOKE with id={request} on"
" session {session} (caller went away)",
request=invoke.id,
session=session._session_id,
)
self._router.send(
invoke.callee,
message.Interrupt(
request=invoke.id,
mode=message.Cancel.KILLNOWAIT,
),
)
else:
self.log.debug(
"INTERRUPT not supported on in-flight INVOKE with id={request} on"
" session {session} (caller went away) - invocation cleaned up but not interrupted",
request=invoke.id,
session=session._session_id,
)
del self._caller_to_invocations[session]
if session in self._session_to_registrations:
# send out Errors for any in-flight calls we have
outstanding = self._callee_to_invocations.get(session, [])
for invoke in outstanding:
self.log.debug(
"Cancelling in-flight INVOKE with id={request} on session {session}",
request=invoke.call.request,
session=session._session_id,
)
reply = message.Error(
message.Call.MESSAGE_TYPE,
invoke.call.request,
ApplicationError.CANCELED,
["callee disconnected from in-flight request"],
)
# send this directly to the caller's session
# (it is possible the caller was disconnected and thus
# _transport is None before we get here though)
if invoke.caller._transport:
invoke.caller._transport.send(reply)
if invoke.timeout_call:
invoke.timeout_call.cancel()
invoke.timeout_call = None
invokes = self._caller_to_invocations[invoke.caller]
invokes.remove(invoke)
if not invokes:
del self._caller_to_invocations[invoke.caller]
del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]
if outstanding:
del self._callee_to_invocations[session]
for registration in self._session_to_registrations[session]:
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)
if was_registered and was_last_callee:
self._registration_map.delete_observation(registration)
# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
if self._router._realm and self._router._realm.session and not registration.uri.startswith("wamp."):
def _publish(registration):
service_session = self._router._realm.session
# FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file!
options = types.PublishOptions(correlation_id=None)
if was_registered:
service_session.publish(
"wamp.registration.on_unregister",
session._session_id,
registration.id,
options=options,
)
if was_last_callee:
if not is_rlink_session:
service_session.publish(
"wamp.registration.on_delete",
session._session_id,
registration.id,
options=options,
)
# we postpone actual sending of meta events until we return to this client session
self._reactor.callLater(0, _publish, registration)
del self._session_to_registrations[session]
else:
raise NotAttached("session with ID {} not attached".format(session._session_id))
[docs]
def processRegister(self, session, register):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processRegister`
"""
# check topic URI: for SUBSCRIBE, must be valid URI (either strict or loose), and all
# URI components must be non-empty other than for wildcard subscriptions
#
is_rlink_session = session._authrole == "rlink"
if self._router.is_traced:
if not register.correlation_id:
register.correlation_id = self._router.new_correlation_id()
register.correlation_is_anchor = True
register.correlation_is_last = False
if not register.correlation_uri:
register.correlation_uri = register.procedure
self._router._factory._worker._maybe_trace_rx_msg(session, register)
if self._option_uri_strict:
if register.match == "wildcard":
uri_is_valid = _URI_PAT_STRICT_EMPTY.match(register.procedure)
elif register.match == "prefix":
uri_is_valid = _URI_PAT_STRICT_LAST_EMPTY.match(register.procedure)
elif register.match == "exact":
uri_is_valid = _URI_PAT_STRICT_NON_EMPTY.match(register.procedure)
else:
# should not arrive here
raise Exception("logic error")
else:
if register.match == "wildcard":
uri_is_valid = _URI_PAT_LOOSE_EMPTY.match(register.procedure)
elif register.match == "prefix":
uri_is_valid = _URI_PAT_LOOSE_LAST_EMPTY.match(register.procedure)
elif register.match == "exact":
uri_is_valid = _URI_PAT_LOOSE_NON_EMPTY.match(register.procedure)
else:
# should not arrive here
raise Exception("logic error")
if not uri_is_valid:
reply = message.Error(
message.Register.MESSAGE_TYPE,
register.request,
ApplicationError.INVALID_URI,
[
"register for invalid procedure URI '{0}' (URI strict checking {1})".format(
register.procedure, self._option_uri_strict
)
],
)
reply.correlation_id = register.correlation_id
reply.correlation_uri = register.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return
# disallow registration of procedures starting with "wamp." other than for
# trusted sessions (that are sessions built into Crossbar.io routing core)
#
if session._authrole is not None and session._authrole != "trusted":
is_restricted = register.procedure.startswith("wamp.")
if is_restricted:
reply = message.Error(
message.Register.MESSAGE_TYPE,
register.request,
ApplicationError.INVALID_URI,
["register for restricted procedure URI '{0}')".format(register.procedure)],
)
reply.correlation_id = register.correlation_id
reply.correlation_uri = register.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return
# authorize REGISTER action
#
d = self._router.authorize(session, register.procedure, "register", options=register.marshal_options())
def on_authorize_success(authorization):
# check the authorization before ANYTHING else, otherwise
# we may leak information about already-registered URIs etc.
if not register.procedure.endswith(".on_log"):
self.log.debug(
'{func}::on_authorize_success() - permission {result} for REGISTER of procedure "{procedure}" [realm="{realm}", session_id={session_id}, authid="{authid}", authrole="{authrole}"]',
func=hltype(self.processRegister),
result=hlflag(authorization["allow"], "GRANTED", "DENIED"),
procedure=hlid(register.procedure),
realm=hlid(session._realm),
session_id=hlid(session._session_id),
authid=hlid(session._authid),
authrole=hlid(session._authrole),
)
if not authorization["allow"]:
# error reply since session is not authorized to register
#
reply = message.Error(
message.Register.MESSAGE_TYPE,
register.request,
ApplicationError.NOT_AUTHORIZED,
[
'session (session_id={}, authid="{}", authrole="{}") is not authorized to register procedure "{}" on realm "{}"'.format(
session._session_id, session._authid, session._authrole, register.procedure, session._realm
)
],
)
# get existing registration for procedure / matching strategy - if any
#
registration = self._registration_map.get_observation(register.procedure, register.match)
# if the session disconnected while the authorization was
# being checked, stop
if session not in self._session_to_registrations:
# if the session *really* disconnected, it won't have
# a _session_id any longer, so we double-check
if session._session_id is not None:
self.log.error(
"Session '{session_id}' still appears valid, but isn't in registration map",
session_id=session._session_id,
)
self.log.info(
"Session vanished while registering '{procedure}'",
procedure=register.procedure,
)
assert registration is None
return
# if force_reregister was enabled, we only do any actual
# kicking of existing registrations *after* authorization
if registration and not register.force_reregister:
# there is an existing registration, and that has an
# invocation strategy that only allows a single callee
# on a the given registration
#
if registration.extra.invoke == message.Register.INVOKE_SINGLE:
reply = message.Error(
message.Register.MESSAGE_TYPE,
register.request,
ApplicationError.PROCEDURE_ALREADY_EXISTS,
["register for already registered procedure '{0}'".format(register.procedure)],
)
reply.correlation_id = register.correlation_id
reply.correlation_uri = register.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return
# there is an existing registration, and that has an
# invokation strategy different from the one requested
# by the new callee
#
if registration.extra.invoke != register.invoke:
reply = message.Error(
message.Register.MESSAGE_TYPE,
register.request,
ApplicationError.PROCEDURE_EXISTS_INVOCATION_POLICY_CONFLICT,
[
"register for already registered procedure '{0}' "
"with conflicting invocation policy (has {1} and "
"{2} was requested)".format(register.procedure, registration.extra.invoke, register.invoke)
],
)
reply.correlation_id = register.correlation_id
reply.correlation_uri = register.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return
# this check is a little redundant, because theoretically
# we already returned (above) if this was False, but for safety...
if authorization["allow"]:
registration = self._registration_map.get_observation(register.procedure, register.match)
if register.force_reregister and registration:
for obs in registration.observers:
self._registration_map.drop_observer(obs, registration)
kicked = message.Unregistered(
0,
registration=registration.id,
reason="wamp.error.unregistered",
)
kicked.correlation_id = register.correlation_id
kicked.correlation_uri = register.procedure
kicked.correlation_is_anchor = False
kicked.correlation_is_last = False
self._router.send(obs, kicked)
self._registration_map.delete_observation(registration)
# ok, session authorized to register. now get the registration
#
registration_extra = RegistrationExtra(register.invoke)
registration_callee_extra = RegistrationCalleeExtra(register.concurrency)
registration, was_already_registered, is_first_callee = self._registration_map.add_observer(
session, register.procedure, register.match, registration_extra, registration_callee_extra
)
if not was_already_registered:
self._session_to_registrations[session].add(registration)
# acknowledge register with registration ID
#
reply = message.Registered(register.request, registration.id)
reply.correlation_id = register.correlation_id
reply.correlation_uri = register.procedure
reply.correlation_is_anchor = False
# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
if (
self._router._realm
and self._router._realm.session
and not registration.uri.startswith("wamp.")
and (is_first_callee or not was_already_registered)
):
reply.correlation_is_last = False
# when this message was forwarded from other nodes, exclude all such nodes
# from receiving the meta event we'll publish below by authid (of the r2r link
# from the forwarding node connected to this router node)
exclude_authid = None
if register.forward_for:
exclude_authid = [ff["authid"] for ff in register.forward_for]
self.log.info(
"WAMP meta event will be published excluding these authids (from forward_for): {exclude_authid}",
exclude_authid=exclude_authid,
)
def _publish():
service_session = self._router._realm.session
if exclude_authid or self._router.is_traced:
options = types.PublishOptions(
correlation_id=register.correlation_id,
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
)
else:
options = None
if is_first_callee:
registration_details = {
"id": registration.id,
"created": registration.created,
"uri": registration.uri,
"match": registration.match,
"invoke": registration.extra.invoke,
}
if not is_rlink_session:
service_session.publish(
"wamp.registration.on_create",
session._session_id,
registration_details,
options=options,
)
if not was_already_registered:
if options:
options.correlation_is_last = True
if not is_rlink_session:
service_session.publish(
"wamp.registration.on_register",
session._session_id,
registration.id,
options=options,
)
# we postpone actual sending of meta events until we return to this client session
self._reactor.callLater(0, _publish)
else:
reply.correlation_is_last = True
# send out reply to register requestor
#
self._router.send(session, reply)
def on_authorize_error(err):
"""
the call to authorize the action _itself_ failed (note this is
different from the call to authorize succeed, but the
authorization being denied)
"""
self.log.failure("Authorization of 'register' for '{uri}' failed", uri=register.procedure, failure=err)
reply = message.Error(
message.Register.MESSAGE_TYPE,
register.request,
ApplicationError.AUTHORIZATION_FAILED,
[
"failed to authorize session for registering procedure '{0}': {1}".format(
register.procedure, err.value
)
],
)
reply.correlation_id = register.correlation_id
reply.correlation_uri = register.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
txaio.add_callbacks(d, on_authorize_success, on_authorize_error)
[docs]
def processUnregister(self, session, unregister):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processUnregister`
"""
if self._router.is_traced:
if not unregister.correlation_id:
unregister.correlation_id = self._router.new_correlation_id()
unregister.correlation_is_anchor = True
unregister.correlation_is_last = False
# get registration by registration ID or None (if it doesn't exist on this broker)
#
registration = self._registration_map.get_observation_by_id(unregister.registration)
if registration:
if self._router.is_traced and not unregister.correlation_uri:
unregister.correlation_uri = registration.uri
if session in registration.observers:
was_registered, was_last_callee, has_follow_up_messages = self._unregister(
registration, session, unregister
)
reply = message.Unregistered(unregister.request)
if self._router.is_traced:
reply.correlation_uri = registration.uri
reply.correlation_is_last = not has_follow_up_messages
else:
# registration exists on this dealer, but the session that wanted to unregister wasn't registered
#
reply = message.Error(
message.Unregister.MESSAGE_TYPE, unregister.request, ApplicationError.NO_SUCH_REGISTRATION
)
if self._router.is_traced:
reply.correlation_uri = reply.error
reply.correlation_is_last = True
else:
# registration doesn't even exist on this broker
#
reply = message.Error(
message.Unregister.MESSAGE_TYPE, unregister.request, ApplicationError.NO_SUCH_REGISTRATION
)
if self._router.is_traced:
reply.correlation_uri = reply.error
reply.correlation_is_last = True
if self._router.is_traced:
self._router._factory._worker._maybe_trace_rx_msg(session, unregister)
reply.correlation_id = unregister.correlation_id
reply.correlation_is_anchor = False
self._router.send(session, reply)
[docs]
def _unregister(self, registration, session, unregister=None):
# drop session from registration observers
#
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)
was_deleted = False
is_rlink_session = session._authrole == "rlink"
if was_registered and was_last_callee:
self._registration_map.delete_observation(registration)
was_deleted = True
# remove registration from session->registrations map
#
if was_registered:
self._session_to_registrations[session].discard(registration)
# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
if (
self._router._realm
and self._router._realm.session
and not registration.uri.startswith("wamp.")
and (was_registered or was_deleted)
):
has_follow_up_messages = True
# when this message was forwarded from other nodes, exclude all such nodes
# from receiving the meta event we'll publish below by authid (of the r2r link
# from the forwarding node connected to this router node)
exclude_authid = None
if unregister.forward_for:
exclude_authid = [ff["authid"] for ff in unregister.forward_for]
self.log.info(
"WAMP meta event will be published excluding these authids (from forward_for): {exclude_authid}",
exclude_authid=exclude_authid,
)
def _publish():
service_session = self._router._realm.session
if unregister and (exclude_authid and self._router.is_traced):
options = types.PublishOptions(
correlation_id=unregister.correlation_id,
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
)
else:
options = None
if was_registered:
service_session.publish(
"wamp.registration.on_unregister", session._session_id, registration.id, options=options
)
if was_deleted:
if options:
options.correlation_is_last = True
if not is_rlink_session:
service_session.publish(
"wamp.registration.on_delete", session._session_id, registration.id, options=options
)
# we postpone actual sending of meta events until we return to this client session
self._reactor.callLater(0, _publish)
else:
has_follow_up_messages = False
return was_registered, was_last_callee, has_follow_up_messages
[docs]
def removeCallee(self, registration, session, reason=None):
"""
Actively unregister a callee session from a registration.
"""
was_registered, was_last_callee, _ = self._unregister(registration, session)
# actively inform the callee that it has been unregistered
#
if (
"callee" in session._session_roles
and session._session_roles["callee"]
and session._session_roles["callee"].registration_revocation
):
reply = message.Unregistered(0, registration=registration.id, reason=reason)
reply.correlation_uri = registration.uri
self._router.send(session, reply)
return was_registered, was_last_callee
[docs]
def processCall(self, session, call: message.Call):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processCall`
"""
if self._router.is_traced:
if not call.correlation_id:
call.correlation_id = self._router.new_correlation_id()
call.correlation_is_anchor = True
call.correlation_is_last = False
if not call.correlation_uri:
call.correlation_uri = call.procedure
self._router._factory._worker._maybe_trace_rx_msg(session, call)
# check procedure URI: for CALL, must be valid URI (either strict or loose), and
# all URI components must be non-empty
if self._option_uri_strict:
uri_is_valid = _URI_PAT_STRICT_NON_EMPTY.match(call.procedure)
else:
uri_is_valid = _URI_PAT_LOOSE_NON_EMPTY.match(call.procedure)
if not uri_is_valid:
reply = message.Error(
message.Call.MESSAGE_TYPE,
call.request,
ApplicationError.INVALID_URI,
[
"call with invalid procedure URI '{0}' (URI strict checking {1})".format(
call.procedure, self._option_uri_strict
)
],
)
reply.correlation_id = call.correlation_id
reply.correlation_uri = call.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return
# authorize CALL action:
# - returns an authorization dict
# - might use static (aka "permissions") or dynamic authorizers
# - might be cached in the router
# - might include a payload validate stanza
# - will be store (later) in InvocationRequest.authorization
d = self._router.authorize(session, call.procedure, "call", options=call.marshal_options())
def on_authorize_success(authorization):
# authorization = {
# 'allow': True,
# 'disclose': False,
# 'validate': {
# 'catalog': 'pydefi.eth',
# 'args': None,
# 'kwargs': None,
# 'results': ['Address'],
# 'kwresults': None,
# },
# 'meta': {
# 'args': None,
# 'kwargs': {
# 'clock_oid': _clock_oid
# }
# },
# 'cache': True,
# }
# the call to authorize the action _itself_ succeeded. now go on depending on whether
# the action was actually authorized or not
if not call.procedure.endswith(".on_log"):
self.log.debug(
'{func}::on_authorize_success() - permission {result} for CALL of procedure "{procedure}" [realm="{realm}", session_id={session_id}, authid="{authid}", authrole="{authrole}"]',
func=hltype(self.processCall),
result=hlflag(authorization["allow"], "GRANTED", "DENIED"),
procedure=hlid(call.procedure),
realm=hlid(session._realm),
session_id=hlid(session._session_id),
authid=hlid(session._authid),
authrole=hlid(session._authrole),
)
if not authorization["allow"]:
reply = message.Error(
message.Call.MESSAGE_TYPE,
call.request,
ApplicationError.NOT_AUTHORIZED,
[
'session (session_id={}, authid="{}", authrole="{}") is not authorized to '
'call procedure "{}" on realm "{}"'.format(
session._session_id, session._authid, session._authrole, call.procedure, session._realm
)
],
)
reply.correlation_id = call.correlation_id
reply.correlation_uri = call.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
else:
# validate payload (skip in "payload_transparency" mode)
if call.payload is None:
try:
self._router.validate(
"call",
call.procedure,
call.args,
call.kwargs,
validate=authorization.get("validate", None),
)
except Exception as e:
reply = message.Error(
message.Call.MESSAGE_TYPE,
call.request,
ApplicationError.INVALID_ARGUMENT,
[
"call of procedure '{0}' with invalid application payload: {1}".format(
call.procedure, e
)
],
)
reply.correlation_id = call.correlation_id
reply.correlation_uri = call.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return
# get registrations active on the procedure called
registration = self._registration_map.best_matching_observation(call.procedure)
# if the session disconnected while the authorization was being checked,
# 'registration' will be None, and we'll (correctly) fire an error.
if registration:
# now actually perform the invocation of the callee
self._call(session, call, registration, authorization)
else:
reply = message.Error(
message.Call.MESSAGE_TYPE,
call.request,
ApplicationError.NO_SUCH_PROCEDURE,
["no callee registered for procedure <{0}>".format(call.procedure)],
)
reply.correlation_id = call.correlation_id
reply.correlation_uri = call.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
def on_authorize_error(err):
"""
the call to authorize the action _itself_ failed (note this is
different from the call to authorize succeed, but the
authorization being denied)
"""
self.log.failure("Authorization of 'call' for '{uri}' failed", uri=call.procedure, failure=err)
reply = message.Error(
message.Call.MESSAGE_TYPE,
call.request,
ApplicationError.AUTHORIZATION_FAILED,
["failed to authorize session for calling procedure '{0}': {1}".format(call.procedure, err.value)],
)
reply.correlation_id = call.correlation_id
reply.correlation_uri = call.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
txaio.add_callbacks(d, on_authorize_success, on_authorize_error)
[docs]
def _call(self, session, call, registration, authorization, is_queued_call=False):
# will hold the callee (the concrete endpoint) that we will forward the call to ..
#
callee = None
callee_extra = None
# determine callee according to invocation policy
#
if registration.extra.invoke in [
message.Register.INVOKE_SINGLE,
message.Register.INVOKE_FIRST,
message.Register.INVOKE_LAST,
]:
# a single endpoint is considered for forwarding the call ..
if registration.extra.invoke == message.Register.INVOKE_SINGLE:
callee = registration.observers[0]
elif registration.extra.invoke == message.Register.INVOKE_FIRST:
callee = registration.observers[0]
elif registration.extra.invoke == message.Register.INVOKE_LAST:
callee = registration.observers[len(registration.observers) - 1]
else:
# should not arrive here
raise Exception("logic error")
# check maximum concurrency of the (single) endpoint
callee_extra = registration.observers_extra.get(callee, None)
if callee_extra:
if callee_extra.concurrency and callee_extra.concurrency_current >= callee_extra.concurrency:
if is_queued_call or (
self._call_store
and self._call_store.maybe_queue_call(session, call, registration, authorization)
):
return False
else:
reply = message.Error(
message.Call.MESSAGE_TYPE,
call.request,
"crossbar.error.max_concurrency_reached",
[
"maximum concurrency {} of callee/endpoint reached (on "
"non-shared/single registration)".format(callee_extra.concurrency)
],
)
reply.correlation_id = call.correlation_id
reply.correlation_uri = call.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return False
else:
callee_extra.concurrency_current += 1
elif registration.extra.invoke == message.Register.INVOKE_ROUNDROBIN:
# remember where we started to search for a suitable callee/endpoint in the
# round-robin list of callee endpoints
roundrobin_start_index = registration.extra.roundrobin_current % len(registration.observers)
# now search fo a suitable callee/endpoint
while True:
callee = registration.observers[registration.extra.roundrobin_current % len(registration.observers)]
callee_extra = registration.observers_extra.get(callee, None)
registration.extra.roundrobin_current += 1
if callee_extra and callee_extra.concurrency:
if callee_extra.concurrency_current >= callee_extra.concurrency:
# this callee has set a maximum concurrency that has already been reached.
# we need to search further, but only if we haven't reached the beginning
# of our round-robin list
if (
registration.extra.roundrobin_current % len(registration.observers)
== roundrobin_start_index
):
# we've looked through the whole round-robin list, and didn't find a suitable
# callee (one that hasn't it's maximum concurrency already reached).
if is_queued_call or (
self._call_store
and self._call_store.maybe_queue_call(session, call, registration, authorization)
):
return False
else:
reply = message.Error(
message.Call.MESSAGE_TYPE,
call.request,
"crossbar.error.max_concurrency_reached",
[
"maximum concurrency {} of all callee/endpoints reached (on "
"round-robin registration)".format(callee_extra.concurrency)
],
)
reply.correlation_id = call.correlation_id
reply.correlation_uri = call.procedure
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(session, reply)
return False
else:
# .. search on ..
pass
else:
# ok, we've found a callee that has set a maximum concurrency, but where the
# maximum has not yet been reached
break
else:
# ok, we've found a callee which hasn't set a maximum concurrency, and hence is always
# eligible for having a call forwarded to
break
if callee_extra:
callee_extra.concurrency_current += 1
elif registration.extra.invoke == message.Register.INVOKE_RANDOM:
# FIXME: implement max. concurrency and call queueing
callee = registration.observers[random.randint(0, len(registration.observers) - 1)]
else:
# should not arrive here
raise Exception("logic error")
# new ID for the invocation
#
invocation_request_id = self._request_id_gen.next()
# caller disclosure
#
if authorization.get("disclose", False):
disclose = True
elif call.procedure.startswith("wamp.") or call.procedure.startswith("crossbar."):
disclose = True
else:
disclose = False
# set the disclosed caller and forward_for
#
forward_for = None
if disclose:
if call.forward_for:
# forwarded call: ultimate caller is the first in forward_for
caller = call.forward_for[0]["session"]
caller_authid = call.forward_for[0]["authid"]
caller_authrole = call.forward_for[0]["authrole"]
# append this session (a r2r link) to forward_for
assert session._session_id is not None
forward_for = call.forward_for + [
{
"session": session._session_id,
"authid": session._authid,
"authrole": session._authrole,
}
]
else:
# non-forwarded call: ultimate caller is this session
caller = session._session_id
caller_authid = session._authid
caller_authrole = session._authrole
else:
# caller session isn't disclosed to the callee
caller = None
caller_authid = None
caller_authrole = None
# for pattern-based registrations, the INVOCATION must contain
# the actual procedure being called
#
if registration.match != message.Register.MATCH_EXACT:
procedure = call.procedure
else:
procedure = None
if call.payload:
invocation = message.Invocation(
invocation_request_id,
registration.id,
payload=call.payload,
timeout=call.timeout,
receive_progress=call.receive_progress,
caller=caller,
caller_authid=caller_authid,
caller_authrole=caller_authrole,
procedure=procedure,
transaction_hash=call.transaction_hash,
enc_algo=call.enc_algo,
enc_key=call.enc_key,
enc_serializer=call.enc_serializer,
forward_for=forward_for,
)
else:
invocation = message.Invocation(
invocation_request_id,
registration.id,
args=call.args,
kwargs=call.kwargs,
timeout=call.timeout,
receive_progress=call.receive_progress,
caller=caller,
caller_authid=caller_authid,
caller_authrole=caller_authrole,
procedure=procedure,
transaction_hash=call.transaction_hash,
forward_for=forward_for,
)
invocation.correlation_id = call.correlation_id
invocation.correlation_uri = call.procedure
invocation.correlation_is_anchor = False
invocation.correlation_is_last = False
# for internal use by rlinks
invocation._router_internal = (
session._session_id, # caller
session._authid,
session._authrole,
)
self._add_invoke_request(
invocation_request_id,
registration,
session,
call,
callee,
forward_for,
authorization,
timeout=call.timeout,
)
self._router.send(callee, invocation)
return True
[docs]
def _add_invoke_request(
self, invocation_request_id, registration, session, call, callee, forward_for, authorization, timeout=None
):
"""
Internal helper. Adds an InvocationRequest to both the
_callee_to_invocations and _invocations maps.
"""
invoke_request = InvocationRequest(
invocation_request_id, registration, session, call, callee, forward_for, authorization
)
self._invocations[invocation_request_id] = invoke_request
self._invocations_by_call[session._session_id, call.request] = invoke_request
invokes = self._callee_to_invocations.get(callee, [])
invokes.append(invoke_request)
self._callee_to_invocations[callee] = invokes
# map to keep track of the invocations by each caller
invokes = self._caller_to_invocations.get(session, [])
invokes.append(invoke_request)
self._caller_to_invocations[session] = invokes
# deal with possible timeouts
# NB: timeouts can only be integers (check spec, but this is
# what Autobahn code says) so we can just have a bucket-based
# thing (and probably should?) instead of "straight" callLater
if timeout:
def _cancel_both_sides():
"""
The timeout was reacted; send an ERROR to the caller and INTERRUPT
to the callee
"""
if _can_cancel(invoke_request.caller, "caller"):
self._router.send(
invoke_request.caller,
message.Error(
message.Call.MESSAGE_TYPE,
call.request,
ApplicationError.CANCELED,
["timeout reached"],
),
)
if _can_cancel(invoke_request.callee, "callee"):
self._router.send(
invoke_request.callee,
message.Interrupt(
invoke_request.id,
message.Cancel.KILLNOWAIT, # or KILL ?
),
)
self._remove_invoke_request(invoke_request)
invoke_request.timeout_call = self._cancel_timers.call_later(timeout, _cancel_both_sides)
return invoke_request
[docs]
def _remove_invoke_request(self, invocation_request):
"""
Internal helper. Removes an InvocationRequest from both the
_callee_to_invocations and _invocations maps.
"""
if invocation_request.timeout_call:
invocation_request.timeout_call.cancel()
invocation_request.timeout_call = None
# all four places should always be updated together
if invocation_request.id in self._invocations:
del self._invocations[invocation_request.id]
invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]
invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]
del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request]
# noinspection PyUnusedLocal
[docs]
def processCancel(self, session, cancel):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processCancel`
A 'caller' has sent us a message wishing to cancel a still-
outstanding call. They can request 1 of 3 modes of cancellation.
"""
if (session._session_id, cancel.request) in self._invocations_by_call:
invocation_request = self._invocations_by_call[session._session_id, cancel.request]
if self._router.is_traced:
# correlate the cancel request to the original call
cancel.correlation_id = invocation_request.call.correlation_id
cancel.correlation_uri = invocation_request.call.procedure
cancel.correlation_is_anchor = False
cancel.correlation_is_last = False
self._router._factory._worker._maybe_trace_rx_msg(session, cancel)
# for those that repeatedly push elevator buttons
if invocation_request.canceled:
return
invocation_request.canceled = True
# "skip" or "kill" or "killnowait" (see WAMP section.14.3.4)
cancellation_mode = cancel.mode
if not _can_cancel(invocation_request.callee, "callee"):
# callee can't deal with an "Interrupt"
cancellation_mode = message.Cancel.SKIP
# on the incoming Cancel, we send Interrupt to "callee"
# and Error to "caller", with a couple wrinkles based on
# the mode:
# - if "skip": don't send Interrupt to 'callee'
# - if "kill": don't send Error until incoming Error from 'callee'
# - if "killnowait": send Interrupt + Error immediately
#
# Note that if we've sent nothing to the callee, we must
# simply ignore any "error" or "result" we get back from
# them. Similarly if we send Error immediately (where we
# must ignore Error if/when the callee sends it).
# (XXX need test for ^ case)
if cancellation_mode != message.Cancel.SKIP:
# FIXME
disclose = True
forward_for = None
if disclose:
if cancel.forward_for:
# append this calling session (a r2r link) to forward_for
assert session._session_id is not None
forward_for = cancel.forward_for + [
{
"session": session._session_id,
"authid": session._authid,
"authrole": session._authrole,
}
]
interrupt_mode = cancellation_mode # "kill" or "killnowait"
interrupt = message.Interrupt(invocation_request.id, interrupt_mode, forward_for=forward_for)
if self._router.is_traced:
interrupt.correlation_id = invocation_request.call.correlation_id
interrupt.correlation_uri = invocation_request.call.procedure
interrupt.correlation_is_anchor = False
interrupt.correlation_is_last = False
self._router.send(invocation_request.callee, interrupt)
# we also need to send an "Error" over to the other
# side -- *when* we do that depends on the mode.
error_msg = message.Error(
message.Call.MESSAGE_TYPE,
cancel.request,
ApplicationError.CANCELED,
)
if cancellation_mode == message.Cancel.KILL: # not SKIP, not KILLNOWAIT
# send the message only after we get Error back from callee
invocation_request.error_msg = error_msg
else:
# send the message immediately (have to make sure
# we'll ignore any Error or Result from callee)
self._router.send(session, error_msg)
return
[docs]
def processYield(self, session, yield_):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processYield`
"""
# assert(session in self._session_to_registrations)
if yield_.request in self._invocations:
# get the invocation request tracked for the caller
#
invocation_request = self._invocations[yield_.request]
if self._router.is_traced:
# correlate the received yield return to the original call
yield_.correlation_id = invocation_request.call.correlation_id
yield_.correlation_uri = invocation_request.call.correlation_uri
yield_.correlation_is_anchor = False
yield_.correlation_is_last = False
self._router._factory._worker._maybe_trace_rx_msg(session, yield_)
# check to make sure this session is the one that is supposed to be yielding
if invocation_request.callee is not session:
raise ProtocolError(
"Dealer.onYield(): YIELD received for non-owned request ID {0}".format(yield_.request)
)
reply = None
# FIXME
disclose = True
# set the disclosed callee and forward_for
#
forward_for = None
if disclose:
if yield_.forward_for:
# forwarded call result: ultimate callee is the first in forward_for
callee = yield_.forward_for[0]["session"]
callee_authid = yield_.forward_for[0]["authid"]
callee_authrole = yield_.forward_for[0]["authrole"]
# append this session (a r2r link) to forward_for
assert session._session_id is not None
forward_for = yield_.forward_for + [
{
"session": session._session_id,
"authid": session._authid,
"authrole": session._authrole,
}
]
else:
# non-forwarded call result: ultimate callee is this session
callee = session._session_id
callee_authid = session._authid
callee_authrole = session._authrole
else:
# callee session isn't disclosed to the caller
callee = None
callee_authid = None
callee_authrole = None
# we've maybe cancelled this invocation
if invocation_request.canceled:
# see if we're waiting to send the error back to the
# other side (do we want to do this ONLY on Error
# coming back? Logically, if we've sent a Cancel and
# the callee supports cancelling, they shouldn't send
# us a Yield -- but there's a race-condition here if
# we're sending the Cancel as they're sending the
# Yield)
if invocation_request.error_msg:
reply = invocation_request.error_msg
invocation_request.error_msg = None
# we're now done; we've gotten some response from the
# callee -- if the type was "skip", we'll never get an
# Error back (NB: if we hit the race condition above,
# we *will* get an Error back "soon" but NOT if the
# cancel mode was "skip")
call_complete = True
else: # not canceled
is_valid = True
if yield_.payload is None:
# validate normal args/kwargs payload
try:
self._router.validate(
"call_result",
invocation_request.call.procedure,
yield_.args,
yield_.kwargs,
validate=invocation_request.authorization.get("validate", None),
)
except Exception as e:
is_valid = False
reply = message.Error(
message.Call.MESSAGE_TYPE,
invocation_request.call.request,
ApplicationError.INVALID_ARGUMENT,
[
"call result from procedure '{0}' with invalid application payload: {1}".format(
invocation_request.call.procedure, e
)
],
callee=callee,
callee_authid=callee_authid,
callee_authrole=callee_authrole,
forward_for=forward_for,
)
else:
reply = message.Result(
invocation_request.call.request,
args=yield_.args,
kwargs=yield_.kwargs,
progress=yield_.progress,
callee=callee,
callee_authid=callee_authid,
callee_authrole=callee_authrole,
forward_for=forward_for,
)
else:
reply = message.Result(
invocation_request.call.request,
payload=yield_.payload,
progress=yield_.progress,
enc_algo=yield_.enc_algo,
enc_key=yield_.enc_key,
enc_serializer=yield_.enc_serializer,
callee=callee,
callee_authid=callee_authid,
callee_authrole=callee_authrole,
forward_for=forward_for,
)
# the call is done if it's a regular call (non-progressive) or if the payload was invalid
#
if not yield_.progress or not is_valid:
call_complete = True
else:
call_complete = False
# the calling session might have been lost in the meantime ..
#
if invocation_request.caller._transport and reply:
if self._router.is_traced:
reply.correlation_id = invocation_request.call.correlation_id
reply.correlation_uri = invocation_request.call.correlation_uri
reply.correlation_is_anchor = False
reply.correlation_is_last = call_complete
try:
self._router.send(invocation_request.caller, reply)
except PayloadExceededError as e:
# cannot send result to original caller as the result size surpasses the
# transport limit (the maximum message size the original calling client is
# willing to receive)
call_complete = True
reply = message.Error(
message.Call.MESSAGE_TYPE,
invocation_request.call.request,
ApplicationError.INVALID_ARGUMENT,
[
"call result from procedure '{0}' with invalid application payload: {1}".format(
invocation_request.call.procedure, e
)
],
callee=callee,
callee_authid=callee_authid,
callee_authrole=callee_authrole,
forward_for=forward_for,
)
reply.correlation_id = invocation_request.call.correlation_id
reply.correlation_uri = invocation_request.call.correlation_uri
reply.correlation_is_anchor = False
reply.correlation_is_last = call_complete
self._router.send(invocation_request.caller, reply)
if call_complete:
# reduce current concurrency on callee
callee_extra = invocation_request.registration.observers_extra.get(session, None)
if callee_extra:
callee_extra.concurrency_current -= 1
# cleanup the (individual) invocation
self._remove_invoke_request(invocation_request)
# check for any calls queued on the registration for which an
# invocation just returned, and hence there is likely concurrency
# free again to actually forward calls previously queued calls
# that were queued because no callee endpoint concurrency was free
if self._call_store:
queued_call = self._call_store.get_queued_call(invocation_request.registration)
if queued_call:
invocation_sent = self._call(
queued_call.session,
queued_call.call,
queued_call.registration,
queued_call.authorization,
True,
)
# only actually pop the queued call when we really were
# able to forward the call now
if invocation_sent:
self._call_store.pop_queued_call(invocation_request.registration)
else:
self.log.debug(
"Dealer.onYield(): YIELD received for non-pending request ID {request_id}", request_id=yield_.request
)
[docs]
def processInvocationError(self, session, error):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processInvocationError`
"""
# assert(session in self._session_to_registrations)
if error.request in self._invocations:
# get the invocation request tracked for the caller
invocation_request = self._invocations[error.request]
if self._router.is_traced:
# correlate the received invocation error to the original call
error.correlation_id = invocation_request.call.correlation_id
error.correlation_uri = invocation_request.call.correlation_uri
error.correlation_is_anchor = False
error.correlation_is_last = False
self._router._factory._worker._maybe_trace_rx_msg(session, error)
# if concurrency is enabled on this, an error counts as
# "an answer" so we decrement.
callee_extra = invocation_request.registration.observers_extra.get(session, None)
if callee_extra:
callee_extra.concurrency_current -= 1
reply = None
# FIXME
disclose = False
# set the disclosed callee and forward_for
#
forward_for = None
if disclose:
if error.forward_for:
# forwarded call: ultimate caller is the first in forward_for
callee = invocation_request.forward_for[0]["session"]
callee_authid = invocation_request.forward_for[0]["authid"]
callee_authrole = invocation_request.forward_for[0]["authrole"]
# append this session (a r2r link) to forward_for
assert session._session_id is not None
forward_for = error.forward_for + [
{
"session": session._session_id,
"authid": session._authid,
"authrole": session._authrole,
}
]
else:
# non-forwarded call result: ultimate callee is this session
callee = session._session_id
callee_authid = session._authid
callee_authrole = session._authrole
else:
# callee session isn't disclosed to the caller
callee = None
callee_authid = None
callee_authrole = None
# we've maybe cancelled this invocation
if invocation_request.canceled:
# see if we're waiting to send the error back to the
# other side (see note about race-condition in Yield)
if invocation_request.error_msg:
reply = invocation_request.error_msg
invocation_request.error_msg = None
else:
if error.payload is None:
# validate normal args/kwargs payload
try:
self._router.validate(
"call_error",
invocation_request.call.procedure,
error.args,
error.kwargs,
validate=invocation_request.authorization.get("validate", None),
)
except Exception as e:
reply = message.Error(
message.Call.MESSAGE_TYPE,
invocation_request.call.request,
ApplicationError.INVALID_ARGUMENT,
[
"call error from procedure '{0}' with invalid application payload: {1}".format(
invocation_request.call.procedure, e
)
],
callee=callee,
callee_authid=callee_authid,
callee_authrole=callee_authrole,
forward_for=forward_for,
)
else:
reply = message.Error(
message.Call.MESSAGE_TYPE,
invocation_request.call.request,
error.error,
args=error.args,
kwargs=error.kwargs,
callee=callee,
callee_authid=callee_authid,
callee_authrole=callee_authrole,
forward_for=forward_for,
)
else:
reply = message.Error(
message.Call.MESSAGE_TYPE,
invocation_request.call.request,
error.error,
payload=error.payload,
enc_algo=error.enc_algo,
enc_key=error.enc_key,
enc_serializer=error.enc_serializer,
callee=callee,
callee_authid=callee_authid,
callee_authrole=callee_authrole,
forward_for=forward_for,
)
# the calling session might have been lost in the meantime ..
#
if invocation_request.caller._transport and reply:
if self._router.is_traced:
reply.correlation_id = invocation_request.call.correlation_id
reply.correlation_uri = invocation_request.call.correlation_uri
reply.correlation_is_anchor = False
reply.correlation_is_last = True
self._router.send(invocation_request.caller, reply)
# the call is done
#
invoke = self._invocations[error.request]
self._remove_invoke_request(invoke)
else:
self.log.debug(
"Dealer.onInvocationError(): ERROR received for non-pending request_type {request_type}"
" and request ID {request_id}",
request_type=error.request_type,
request_id=error.request,
)