Source code for crossbar.router.broker

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import copy

import txaio
from autobahn import util
from autobahn.exception import PayloadExceededError
from autobahn.wamp import message, role, types
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.message import (
    _URI_PAT_LOOSE_EMPTY,
    _URI_PAT_LOOSE_LAST_EMPTY,
    _URI_PAT_LOOSE_NON_EMPTY,
    _URI_PAT_STRICT_EMPTY,
    _URI_PAT_STRICT_LAST_EMPTY,
    _URI_PAT_STRICT_NON_EMPTY,
)
from txaio import make_logger

from crossbar._util import hlflag, hlid, hltype
from crossbar.router import NotAttached, RouterOptions
from crossbar.router.observation import UriObservationMap

__all__ = ("Broker",)


class RetainedEvent(object):
    __slots__ = (
        "publish",
        "publisher",
        "publisher_authid",
        "publisher_authrole",
    )

    def __init__(self, publish, publisher=None, publisher_authid=None, publisher_authrole=None):
        self.publish = publish
        self.publisher = publisher
        self.publisher_authid = publisher_authid
        self.publisher_authrole = publisher_authrole


class SubscriptionExtra(object):
    __slots__ = ("retained_events",)

    def __init__(self, retained_events=None):
        self.retained_events = retained_events or []


[docs] class Broker(object): """ Basic WAMP broker. """
[docs] log = make_logger()
def __init__(self, router, reactor, options=None): """ :param router: The router this broker is part of. :type router: Object that implements :class:`crossbar.router.interfaces.IRouter`. :param options: Router options. :type options: Instance of :class:`crossbar.router.types.RouterOptions`. """
[docs] self._router = router
[docs] self._reactor = reactor
[docs] self._options = options or RouterOptions()
# generator for WAMP request IDs
[docs] self._request_id_gen = util.IdGenerator()
# subscription map managed by this broker
[docs] self._subscription_map = UriObservationMap()
# map: session -> set of subscriptions (needed for detach)
[docs] self._session_to_subscriptions = {}
# check all topic URIs with strict rules
[docs] self._option_uri_strict = self._options.uri_check == RouterOptions.URI_CHECK_STRICT
# supported features from "WAMP Advanced Profile"
[docs] self._role_features = role.RoleBrokerFeatures( publisher_identification=True, pattern_based_subscription=True, session_meta_api=True, subscription_meta_api=True, subscriber_blackwhite_listing=True, publisher_exclusion=True, subscription_revocation=True, event_retention=True, payload_transparency=True, payload_encryption_cryptobox=True, )
# store for event history if self._router._store: self._event_store = self._router._store else: self._event_store = None # if there is a store, let the store attach itself to all the subscriptions # it is configured to track if self._event_store: self._event_store.attach_subscription_map(self._subscription_map)
[docs] def attach(self, session): """ Implements :func:`crossbar.router.interfaces.IBroker.attach` """ if session not in self._session_to_subscriptions: self._session_to_subscriptions[session] = set() else: raise Exception("session with ID {} already attached".format(session._session_id))
[docs] def detach(self, session): """ Implements :func:`crossbar.router.interfaces.IBroker.detach` """ if session in self._session_to_subscriptions: for subscription in self._session_to_subscriptions[session]: was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription) was_deleted = False # delete it if there are no subscribers and no retained events # if was_subscribed and was_last_subscriber and not subscription.extra.retained_events: was_deleted = True self._subscription_map.delete_observation(subscription) # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # if self._router._realm and self._router._realm.session and not subscription.uri.startswith("wamp."): def _publish(subscription): service_session = self._router._realm.session # FIXME: what about exclude_authid as colleced from forward_for? like we do elsewhere in this file! options = types.PublishOptions( correlation_id=None, correlation_is_anchor=True, correlation_is_last=False ) if was_subscribed: service_session.publish( "wamp.subscription.on_unsubscribe", session._session_id, subscription.id, options=options, ) if was_deleted: options.correlation_is_last = True service_session.publish( "wamp.subscription.on_delete", session._session_id, subscription.id, options=options, ) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish, subscription) del self._session_to_subscriptions[session] else: raise NotAttached("session with ID {} not attached".format(session._session_id))
[docs] def _filter_publish_receivers(self, receivers, publish): """ Internal helper. Does all filtering on a candidate set of Publish receivers, based on all the white/blacklist options in 'publish'. """ # filter by "eligible" receivers # if publish.eligible: # map eligible session IDs to eligible sessions eligible = set() for session_id in publish.eligible: if session_id in self._router._session_id_to_session: eligible.add(self._router._session_id_to_session[session_id]) # filter receivers for eligible sessions receivers = eligible & receivers # if "eligible_authid" we only accept receivers that have the correct authid if publish.eligible_authid: eligible = set() for aid in publish.eligible_authid: eligible.update(self._router._authid_to_sessions.get(aid, set())) receivers = receivers & eligible # if "eligible_authrole" we only accept receivers that have the correct authrole if publish.eligible_authrole: eligible = set() for ar in publish.eligible_authrole: eligible.update(self._router._authrole_to_sessions.get(ar, set())) receivers = receivers & eligible # remove "excluded" receivers # if publish.exclude: # map excluded session IDs to excluded sessions exclude = set() for s in publish.exclude: if s in self._router._session_id_to_session: exclude.add(self._router._session_id_to_session[s]) # filter receivers for excluded sessions if exclude: receivers = receivers - exclude # remove auth-id based receivers if publish.exclude_authid: for aid in publish.exclude_authid: receivers = receivers - self._router._authid_to_sessions.get(aid, set()) # remove authrole based receivers if publish.exclude_authrole: for ar in publish.exclude_authrole: receivers = receivers - self._router._authrole_to_sessions.get(ar, set()) return receivers
[docs] def processPublish(self, session, publish): """ Implements :func:`crossbar.router.interfaces.IBroker.processPublish` """ if self._router.is_traced: if not publish.correlation_id: publish.correlation_id = self._router.new_correlation_id() publish.correlation_is_anchor = True if not publish.correlation_uri: publish.correlation_uri = publish.topic # check topic URI: for PUBLISH, must be valid URI (either strict or loose), and # all URI components must be non-empty if self._option_uri_strict: uri_is_valid = _URI_PAT_STRICT_NON_EMPTY.match(publish.topic) else: uri_is_valid = _URI_PAT_LOOSE_NON_EMPTY.match(publish.topic) if not uri_is_valid: if publish.acknowledge: if self._router.is_traced: publish.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, publish) reply = message.Error( message.Publish.MESSAGE_TYPE, publish.request, ApplicationError.INVALID_URI, [ "publish with invalid topic URI '{0}' (URI strict checking {1})".format( publish.topic, self._option_uri_strict ) ], ) reply.correlation_id = publish.correlation_id reply.correlation_uri = publish.topic reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) else: if self._router.is_traced: publish.correlation_is_last = True self._router._factory._worker._maybe_trace_rx_msg(session, publish) return # disallow publication to topics starting with "wamp." other than for # trusted sessions (that are sessions built into Crossbar.io routing core) # if session._authrole is not None and session._authrole != "trusted": is_restricted = publish.topic.startswith("wamp.") if is_restricted: if publish.acknowledge: if self._router.is_traced: publish.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, publish) reply = message.Error( message.Publish.MESSAGE_TYPE, publish.request, ApplicationError.INVALID_URI, ["publish with restricted topic URI '{0}'".format(publish.topic)], ) reply.correlation_id = publish.correlation_id reply.correlation_uri = publish.topic reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) else: if self._router.is_traced: publish.correlation_is_last = True self._router._factory._worker._maybe_trace_rx_msg(session, publish) return # get subscriptions active on the topic published to # subscriptions = self._subscription_map.match_observations(publish.topic) # check if the event is being persisted by checking if we ourself are among the observers # on _any_ matching subscription # we've been previously added to observer lists on subscriptions ultimately from # node configuration and during the broker starts up. store_event = False if self._event_store: for subscription in subscriptions: if self._event_store in subscription.observers: store_event = True break if store_event: self.log.debug('Persisting event on topic "{topic}"', topic=publish.topic) # check if the event is to be retained by inspecting the 'retain' flag retain_event = False if publish.retain: retain_event = True # go on if (otherwise there isn't anything to do anyway): # # - there are any active subscriptions OR # - the publish is to be acknowledged OR # - the event is to be persisted OR # - the event is to be retained # if not (subscriptions or publish.acknowledge or store_event or retain_event): # the received PUBLISH message is the only one received/sent # for this WAMP action, so mark it as "last" (there is another code path below!) if self._router.is_traced: if publish.correlation_is_last is None: publish.correlation_is_last = True self._router._factory._worker._maybe_trace_rx_msg(session, publish) else: # authorize PUBLISH action # d = self._router.authorize(session, publish.topic, "publish", options=publish.marshal_options()) def on_authorize_success(authorization): # the call to authorize the action _itself_ succeeded. now go on depending on whether # the action was actually authorized or not if not publish.topic.endswith(".on_log"): self.log.debug( '{func}::on_authorize_success() - permission {result} for PUBLISH to topic "{topic}" [realm="{realm}", session_id={session_id}, authid="{authid}", authrole="{authrole}"]', func=hltype(self.processPublish), result=hlflag(authorization["allow"], "GRANTED", "DENIED"), topic=hlid(publish.topic), realm=hlid(session._realm), session_id=hlid(session._session_id), authid=hlid(session._authid), authrole=hlid(session._authrole), ) if not authorization["allow"]: if publish.acknowledge: if self._router.is_traced: publish.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, publish) reply = message.Error( message.Publish.MESSAGE_TYPE, publish.request, ApplicationError.NOT_AUTHORIZED, ["session not authorized to publish to topic '{0}'".format(publish.topic)], ) reply.correlation_id = publish.correlation_id reply.correlation_uri = publish.topic reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) else: if self._router.is_traced: publish.correlation_is_last = True self._router._factory._worker._maybe_trace_rx_msg(session, publish) else: # validate payload (skip in "payload_transparency" mode) if publish.payload is None: try: self._router.validate( "event", publish.topic, publish.args, publish.kwargs, validate=authorization.get("validate", None), ) except Exception as e: if publish.acknowledge: if self._router.is_traced: publish.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, publish) reply = message.Error( message.Publish.MESSAGE_TYPE, publish.request, ApplicationError.INVALID_ARGUMENT, [ "publish to topic URI '{0}' with invalid application payload: {1}".format( publish.topic, e ) ], ) reply.correlation_id = publish.correlation_id reply.correlation_uri = publish.topic reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) else: if self._router.is_traced: publish.correlation_is_last = True self._router._factory._worker._maybe_trace_rx_msg(session, publish) return # new ID for the publication # publication = util.id() # publisher disclosure # if authorization["disclose"]: disclose = True elif publish.topic.startswith("wamp.") or publish.topic.startswith("crossbar."): disclose = True else: disclose = False forward_for = None if disclose: if publish.forward_for: publisher = publish.forward_for[0]["session"] publisher_authid = publish.forward_for[0]["authid"] publisher_authrole = publish.forward_for[0]["authrole"] assert session._session_id is not None forward_for = publish.forward_for + [ { "session": session._session_id, "authid": session._authid, "authrole": session._authrole, } ] else: publisher = session._session_id publisher_authid = session._authid publisher_authrole = session._authrole else: publisher = None publisher_authid = None publisher_authrole = None # skip publisher # if publish.exclude_me is None or publish.exclude_me: me_also = False else: me_also = True # persist event (this is done only once, regardless of the number of subscriptions # the event matches on) # if store_event: self._event_store.store_event(session, publication, publish) # retain event on the topic # if retain_event: retained_event = RetainedEvent(publish, publisher, publisher_authid, publisher_authrole) observation = self._subscription_map.get_observation(publish.topic) if not observation: # No observation, lets make a new one observation = self._subscription_map.create_observation( publish.topic, extra=SubscriptionExtra() ) else: # this can happen if event-history is # enabled on the topic: the event-store # creates an observation before any client # could possible hit the code above if observation.extra is None: observation.extra = SubscriptionExtra() elif not isinstance(observation.extra, SubscriptionExtra): raise Exception("incorrect 'extra' for '{}'".format(publish.topic)) if observation.extra.retained_events: if not publish.eligible and not publish.exclude: observation.extra.retained_events = [retained_event] else: observation.extra.retained_events.append(retained_event) else: observation.extra.retained_events = [retained_event] subscription_to_receivers = {} total_receivers_cnt = 0 # iterate over all subscriptions and determine actual receivers of the event # under the respective subscription. also persist events (independent of whether # there is any actual receiver right now on the subscription) # for subscription in subscriptions: # initial list of receivers are all subscribers on a subscription .. # receivers = subscription.observers receivers = self._filter_publish_receivers(receivers, publish) # if receivers is non-empty, dispatch event .. # receivers_cnt = len(receivers) - (1 if self in receivers else 0) if receivers_cnt: total_receivers_cnt += receivers_cnt subscription_to_receivers[subscription] = receivers # send publish acknowledge before dispatching # if publish.acknowledge: if self._router.is_traced: publish.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, publish) reply = message.Published(publish.request, publication) reply.correlation_id = publish.correlation_id reply.correlation_uri = publish.topic reply.correlation_is_anchor = False reply.correlation_is_last = total_receivers_cnt == 0 self._router.send(session, reply) else: if self._router.is_traced and publish.correlation_is_last is None: if total_receivers_cnt == 0: publish.correlation_is_last = True else: publish.correlation_is_last = False # now actually dispatch the events! # for chunked dispatching, this will be filled with deferreds for each chunk # processed. when the complete list of deferreds is done, that means the # event has been sent out to all applicable receivers all_dl = [] if total_receivers_cnt: # list of receivers that should have received the event, but we could not # send the event, since the receiver has disappeared in the meantime vanished_receivers = [] for subscription, receivers in subscription_to_receivers.items(): storing_event = store_event and self._event_store in subscription.observers self.log.debug( "dispatching for subscription={subscription}, storing_event={storing_event}", subscription=subscription, storing_event=storing_event, ) # for pattern-based subscriptions, the EVENT must contain # the actual topic being published to # if subscription.match != message.Subscribe.MATCH_EXACT: topic = publish.topic else: topic = None if publish.payload: msg = message.Event( subscription.id, publication, payload=publish.payload, publisher=publisher, publisher_authid=publisher_authid, publisher_authrole=publisher_authrole, topic=topic, transaction_hash=publish.transaction_hash, enc_algo=publish.enc_algo, enc_key=publish.enc_key, enc_serializer=publish.enc_serializer, forward_for=forward_for, ) else: msg = message.Event( subscription.id, publication, args=publish.args, kwargs=publish.kwargs, publisher=publisher, publisher_authid=publisher_authid, publisher_authrole=publisher_authrole, topic=topic, transaction_hash=publish.transaction_hash, forward_for=forward_for, ) # if the publish message had a correlation ID, this will also be the # correlation ID of the event message sent out msg.correlation_id = publish.correlation_id msg.correlation_uri = publish.topic msg.correlation_is_anchor = False msg.correlation_is_last = False msg._router_internal = ( session._session_id, # publisher session._authid, session._authrole, ) chunk_size = self._options.event_dispatching_chunk_size if chunk_size and len(receivers) > chunk_size: self.log.debug( "chunked dispatching to {receivers_size} with chunk_size={chunk_size}", receivers_size=len(receivers), chunk_size=chunk_size, ) else: self.log.debug( "unchunked dispatching to {receivers_size} receivers", receivers_size=len(receivers), ) # note that we're using one code-path for both chunked and unchunked # dispatches; the *first* chunk is always done "synchronously" (before # the first call-later) so "un-chunked mode" really just means we know # we'll be done right now and NOT do a call_later... # a Deferred that fires when all chunks are done all_d = txaio.create_future() all_dl.append(all_d) # all the event messages are the same except for the last one, which # needs to have the "is_last" flag set if we're doing a trace if self._router.is_traced: last_msg = copy.deepcopy(msg) last_msg.correlation_id = msg.correlation_id last_msg.correlation_uri = msg.correlation_uri last_msg.correlation_is_anchor = False last_msg.correlation_is_last = True def _notify_some(receivers): # we do a first pass over the proposed chunk of receivers # because not all of them will have a transport, and if this # will be the last chunk of receivers we need to figure out # which event is last... receivers_this_chunk = [] for receiver in receivers[:chunk_size]: if receiver._session_id and receiver._transport: receivers_this_chunk.append(receiver) else: vanished_receivers.append(receiver) receivers = receivers[chunk_size:] # XXX note there's still going to be some edge-cases here .. if # we are NOT the last chunk, but all the next chunk's receivers # (could be only 1 in that chunk!) vanish before we run our next # batch, then a "last" event will never go out ... # we now actually do the deliveries, but now we know which # receiver is the last one if receivers or not self._router.is_traced: # NOT the last chunk (or we're not traced so don't care) for receiver in receivers_this_chunk: # send out WAMP msg to peer try: self._router.send(receiver, msg) except PayloadExceededError as e: self.log.warn( "could not dispatch event to receiver {receiver} (subscription_id={subscription_id}, publication_id={publication_id}): {err}", receiver=receiver._session_id, subscription_id=subscription.id, publication_id=publication, err=str(e), ) if self._event_store or storing_event: self._event_store.store_event_history( publication, subscription.id, receiver ) else: # last chunk, so last receiver gets the different message for receiver in receivers_this_chunk[:-1]: try: self._router.send(receiver, msg) except PayloadExceededError as e: self.log.warn( "could not dispatch event to receiver {receiver} (subscription_id={subscription_id}, publication_id={publication_id}): {err}", receiver=receiver._session_id, subscription_id=subscription.id, publication_id=publication, err=str(e), ) if self._event_store or storing_event: self._event_store.store_event_history( publication, subscription.id, receiver ) # send last receiver a different message (and guard: we might have zero valid receivers in the last chunk!) if receivers_this_chunk: receiver = receivers_this_chunk[-1] try: self._router.send(receiver, last_msg) except PayloadExceededError as e: self.log.warn( "could not dispatch event to receiver {receiver} (subscription_id={subscription_id}, publication_id={publication_id}): {err}", receiver=receiver._session_id, subscription_id=subscription.id, publication_id=publication, err=str(e), ) if self._event_store or storing_event: self._event_store.store_event_history( publication, subscription.id, receiver ) if receivers: # still more to do .. return txaio.call_later(0, _notify_some, receivers) else: # all done! resolve all_d, which represents all receivers # to a single subscription matching the event txaio.resolve(all_d, None) _notify_some( [ recv for recv in receivers if (me_also or recv != session) and recv != self._event_store ] ) return txaio.gather(all_dl) def on_authorize_error(err): """ the call to authorize the action _itself_ failed (note this is different from the call to authorize succeed, but the authorization being denied) """ self.log.failure("Authorization failed", failure=err) if publish.acknowledge: if self._router.is_traced: publish.correlation_is_last = False self._router._factory._worker._maybe_trace_rx_msg(session, publish) reply = message.Error( message.Publish.MESSAGE_TYPE, publish.request, ApplicationError.AUTHORIZATION_FAILED, [ "failed to authorize session for publishing to topic URI '{0}': {1}".format( publish.topic, err.value ) ], ) reply.correlation_id = publish.correlation_id reply.correlation_uri = publish.topic reply.correlation_is_anchor = False self._router.send(session, reply) else: if self._router.is_traced: publish.correlation_is_last = True self._router._factory._worker._maybe_trace_rx_msg(session, publish) txaio.add_callbacks(d, on_authorize_success, on_authorize_error)
[docs] def processSubscribe(self, session, subscribe): """ Implements :func:`crossbar.router.interfaces.IBroker.processSubscribe` """ if self._router.is_traced: if not subscribe.correlation_id: subscribe.correlation_id = self._router.new_correlation_id() subscribe.correlation_is_anchor = True subscribe.correlation_is_last = False if not subscribe.correlation_uri: subscribe.correlation_uri = subscribe.topic self._router._factory._worker._maybe_trace_rx_msg(session, subscribe) # check topic URI: for SUBSCRIBE, must be valid URI (either strict or loose), and all # URI components must be non-empty for normal subscriptions, may be empty for # wildcard subscriptions and must be non-empty for all but the last component for # prefix subscriptions # if self._option_uri_strict: if subscribe.match == "wildcard": uri_is_valid = _URI_PAT_STRICT_EMPTY.match(subscribe.topic) elif subscribe.match == "prefix": uri_is_valid = _URI_PAT_STRICT_LAST_EMPTY.match(subscribe.topic) else: uri_is_valid = _URI_PAT_STRICT_NON_EMPTY.match(subscribe.topic) else: if subscribe.match == "wildcard": uri_is_valid = _URI_PAT_LOOSE_EMPTY.match(subscribe.topic) elif subscribe.match == "prefix": uri_is_valid = _URI_PAT_LOOSE_LAST_EMPTY.match(subscribe.topic) else: uri_is_valid = _URI_PAT_LOOSE_NON_EMPTY.match(subscribe.topic) if not uri_is_valid: reply = message.Error( message.Subscribe.MESSAGE_TYPE, subscribe.request, ApplicationError.INVALID_URI, ["subscribe for invalid topic URI '{0}'".format(subscribe.topic)], ) reply.correlation_id = subscribe.correlation_id reply.correlation_uri = subscribe.topic reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) return # authorize SUBSCRIBE action # d = self._router.authorize(session, subscribe.topic, "subscribe", options=subscribe.marshal_options()) def on_authorize_success(authorization): if not subscribe.topic.endswith(".on_log"): self.log.debug( '{func}::on_authorize_success() - permission {result} for SUBSCRIBE to topic "{topic}" [realm="{realm}", session_id={session_id}, authid="{authid}", authrole="{authrole}"]', func=hltype(self.processSubscribe), result=hlflag(authorization["allow"], "GRANTED", "DENIED"), topic=hlid(subscribe.topic), realm=hlid(session._realm), session_id=hlid(session._session_id), authid=hlid(session._authid), authrole=hlid(session._authrole), ) if not authorization["allow"]: # error reply since session is not authorized to subscribe replies = [ message.Error( message.Subscribe.MESSAGE_TYPE, subscribe.request, ApplicationError.NOT_AUTHORIZED, [ 'session (session_id={}, authid="{}", authrole="{}") is not authorized to subscribe to topic "{}" on realm "{}"'.format( session._session_id, session._authid, session._authrole, subscribe.topic, session._realm, ) ], ) ] replies[0].correlation_id = subscribe.correlation_id replies[0].correlation_uri = subscribe.topic replies[0].correlation_is_anchor = False replies[0].correlation_is_last = True else: # if the session disconencted while the authorization was # being checked, stop if session not in self._session_to_subscriptions: # if the session *really* disconnected, it won't have # a _session_id any longer, so we double-check if session._session_id is not None: self.log.error( "Session '{session_id}' still appears valid, but isn't in subscription map", session_id=session._session_id, ) self.log.info( "Session vanished while subscribing to '{topic}'", topic=subscribe.topic, ) return # ok, session authorized to subscribe. now get the subscription # subscription, was_already_subscribed, is_first_subscriber = self._subscription_map.add_observer( session, subscribe.topic, subscribe.match, extra=SubscriptionExtra() ) if not was_already_subscribed: self._session_to_subscriptions[session].add(subscription) # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # if ( self._router._realm and self._router._realm.session and not subscription.uri.startswith("wamp.") and (is_first_subscriber or not was_already_subscribed) ): has_follow_up_messages = True exclude_authid = None if subscribe.forward_for: exclude_authid = [ff["authid"] for ff in subscribe.forward_for] def _publish(): service_session = self._router._realm.session if exclude_authid or self._router.is_traced: options = types.PublishOptions( correlation_id=subscribe.correlation_id, correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, ) else: options = None if is_first_subscriber: subscription_details = { "id": subscription.id, "created": subscription.created, "uri": subscription.uri, "match": subscription.match, } service_session.publish( "wamp.subscription.on_create", session._session_id, subscription_details, options=options, ) if not was_already_subscribed: if options: options.correlation_is_last = True service_session.publish( "wamp.subscription.on_subscribe", session._session_id, subscription.id, options=options, ) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish) else: has_follow_up_messages = False # check for retained events # def _get_retained_event(): if subscription.extra.retained_events: retained_events = list(subscription.extra.retained_events) retained_events.reverse() for retained_event in retained_events: authorized = False if not retained_event.publish.exclude and not retained_event.publish.eligible: authorized = True elif ( session._session_id in retained_event.publish.eligible and session._session_id not in retained_event.publish.exclude ): authorized = True if authorized: publication = util.id() if retained_event.publish.payload: msg = message.Event( subscription.id, publication, payload=retained_event.publish.payload, enc_algo=retained_event.publish.enc_algo, enc_key=retained_event.publish.enc_key, enc_serializer=retained_event.publish.enc_serializer, publisher=retained_event.publisher, publisher_authid=retained_event.publisher_authid, publisher_authrole=retained_event.publisher_authrole, retained=True, ) else: msg = message.Event( subscription.id, publication, args=retained_event.publish.args, kwargs=retained_event.publish.kwargs, publisher=retained_event.publisher, publisher_authid=retained_event.publisher_authid, publisher_authrole=retained_event.publisher_authrole, retained=True, ) msg.correlation_id = subscribe.correlation_id msg.correlation_uri = subscribe.topic msg.correlation_is_anchor = False msg.correlation_is_last = False return [msg] return [] # acknowledge subscribe with subscription ID # replies = [message.Subscribed(subscribe.request, subscription.id)] replies[0].correlation_id = subscribe.correlation_id replies[0].correlation_uri = subscribe.topic replies[0].correlation_is_anchor = False replies[0].correlation_is_last = False if subscribe.get_retained: replies.extend(_get_retained_event()) replies[-1].correlation_is_last = not has_follow_up_messages # send out reply to subscribe requestor # [self._router.send(session, reply) for reply in replies] def on_authorize_error(err): """ the call to authorize the action _itself_ failed (note this is different from the call to authorize succeed, but the authorization being denied) """ self.log.failure("Authorization of 'subscribe' for '{uri}' failed", uri=subscribe.topic, failure=err) reply = message.Error( message.Subscribe.MESSAGE_TYPE, subscribe.request, ApplicationError.AUTHORIZATION_FAILED, [ "failed to authorize session for subscribing to topic URI '{0}': {1}".format( subscribe.topic, err.value ) ], ) reply.correlation_id = subscribe.correlation_id reply.correlation_uri = subscribe.topic reply.correlation_is_anchor = False reply.correlation_is_last = True self._router.send(session, reply) txaio.add_callbacks(d, on_authorize_success, on_authorize_error)
[docs] def processUnsubscribe(self, session, unsubscribe): """ Implements :func:`crossbar.router.interfaces.IBroker.processUnsubscribe` """ if self._router.is_traced: if not unsubscribe.correlation_id: unsubscribe.correlation_id = self._router.new_correlation_id() unsubscribe.correlation_is_anchor = True unsubscribe.correlation_is_last = False # get subscription by subscription ID or None (if it doesn't exist on this broker) # subscription = self._subscription_map.get_observation_by_id(unsubscribe.subscription) if subscription: if self._router.is_traced and not unsubscribe.correlation_uri: unsubscribe.correlation_uri = subscription.uri if session in subscription.observers: was_subscribed, was_last_subscriber, has_follow_up_messages = self._unsubscribe( subscription, session, unsubscribe ) reply = message.Unsubscribed(unsubscribe.request) if self._router.is_traced: reply.correlation_uri = subscription.uri reply.correlation_is_last = not has_follow_up_messages else: # subscription exists on this broker, but the session that wanted to unsubscribe wasn't subscribed # reply = message.Error( message.Unsubscribe.MESSAGE_TYPE, unsubscribe.request, ApplicationError.NO_SUCH_SUBSCRIPTION ) if self._router.is_traced: reply.correlation_uri = reply.error reply.correlation_is_last = True else: # subscription doesn't even exist on this broker # reply = message.Error( message.Unsubscribe.MESSAGE_TYPE, unsubscribe.request, ApplicationError.NO_SUCH_SUBSCRIPTION ) if self._router.is_traced: reply.correlation_uri = reply.error reply.correlation_is_last = True if self._router.is_traced: self._router._factory._worker._maybe_trace_rx_msg(session, unsubscribe) reply.correlation_id = unsubscribe.correlation_id reply.correlation_is_anchor = False self._router.send(session, reply)
[docs] def _unsubscribe(self, subscription, session, unsubscribe=None): # drop session from subscription observers # was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription) was_deleted = False if was_subscribed and was_last_subscriber and not subscription.extra.retained_events: self._subscription_map.delete_observation(subscription) was_deleted = True # remove subscription from session->subscriptions map # if was_subscribed: self._session_to_subscriptions[session].discard(subscription) # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # if ( self._router._realm and self._router._realm.session and not subscription.uri.startswith("wamp.") and (was_subscribed or was_deleted) ): has_follow_up_messages = True exclude_authid = None if unsubscribe and unsubscribe.forward_for: exclude_authid = [ff["authid"] for ff in unsubscribe.forward_for] def _publish(): service_session = self._router._realm.session if unsubscribe and (exclude_authid or self._router.is_traced): options = types.PublishOptions( correlation_id=unsubscribe.correlation_id, correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, ) else: options = None if was_subscribed: service_session.publish( "wamp.subscription.on_unsubscribe", session._session_id, subscription.id, options=options, ) if was_deleted: if options: options.correlation_is_last = True service_session.publish( "wamp.subscription.on_delete", session._session_id, subscription.id, options=options, ) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish) else: has_follow_up_messages = False return was_subscribed, was_last_subscriber, has_follow_up_messages
[docs] def removeSubscriber(self, subscription, session, reason=None): """ Actively unsubscribe a subscriber session from a subscription. """ was_subscribed, was_last_subscriber, _ = self._unsubscribe(subscription, session) if ( "subscriber" in session._session_roles and session._session_roles["subscriber"] and session._session_roles["subscriber"].subscription_revocation ): reply = message.Unsubscribed(0, subscription=subscription.id, reason=reason) reply.correlation_uri = subscription.uri self._router.send(session, reply) return was_subscribed, was_last_subscriber