#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
from typing import Optional
from autobahn import util
from pytrie import StringTrie
from crossbar.router.wildcard import WildcardMatcher, WildcardTrieMatcher
__all__ = ("UriObservationMap", "is_protected_uri")
[docs]
def is_protected_uri(uri, details=None):
"""
Test if the given URI is from a "protected namespace" (starting with `wamp.`
or `crossbar.`). Note that "trusted" clients can access all namespaces.
"""
trusted = details and details.caller_authrole == "trusted"
if trusted:
return False
else:
return uri.startswith("wamp.") or uri.startswith("crossbar.")
class OrderedSet(set):
__slots__ = ("_list",)
def __init__(self):
super(set, self).__init__()
self._list = []
def add(self, item):
super(OrderedSet, self).add(item)
self._list.append(item)
def discard(self, item):
self._list.remove(item)
return super(OrderedSet, self).discard(item)
def __getitem__(self, index):
return self._list[index]
def __iter__(self):
return iter(self._list)
def __reversed__(self):
return reversed(self._list)
class UriObservation(object):
"""
Represents an URI observation maintained by a broker/dealer.
"""
__slots__ = ("uri", "ordered", "extra", "id", "created", "observers")
match: Optional[str] = None
def __init__(self, uri, ordered=False, extra=None):
"""
:param uri: The URI (or URI pattern) for this observation.
:type uri: unicode
"""
# URI (or URI pattern) this observation is created for
self.uri = uri
# flag indicating whether observers should be maintained
# in an ordered set or a regular, unordered set
self.ordered = ordered
# arbitrary, opaque extra data attached to the observation
self.extra = extra
# generate a new ID for the observation
self.id = util.id()
# UTC timestamp this observation was created
self.created = util.utcnow()
# set of observers
if self.ordered:
self.observers = OrderedSet()
else:
self.observers = set()
# arbitrary, opaque extra data attached to the observers of this observation
self.observers_extra = {}
def __repr__(self):
return "{}(id={}, uri={}, match={}, ordered={}, extra={}, created={}, observers={})".format(
self.__class__.__name__,
self.id,
self.uri,
self.match,
self.ordered,
self.extra,
self.created,
self.observers,
)
class ExactUriObservation(UriObservation):
"""
Represents an exact-matching observation.
"""
match = "exact"
class PrefixUriObservation(UriObservation):
"""
Represents a prefix-matching observation.
"""
match = "prefix"
class WildcardUriObservation(UriObservation):
"""
Represents a wildcard-matching observation.
"""
match = "wildcard"
[docs]
class UriObservationMap(object):
"""
Represents the current set of observations maintained by a broker/dealer.
To test: trial crossbar.router.test.test_subscription
"""
[docs]
__slots__ = (
"_ordered",
"_observations_exact",
"_observations_prefix",
"_observations_wildcard",
"_observation_id_to_observation",
)
def __init__(self, ordered=False):
# flag indicating whether observers should be maintained in a SortedSet
# or a regular set (unordered)
[docs]
self._ordered = ordered
# map: URI => ExactUriObservation
[docs]
self._observations_exact = {}
# map: URI => PrefixUriObservation
[docs]
self._observations_prefix = StringTrie()
# map: URI => WildcardUriObservation
if True:
# use a Trie-based implementation (supposed to be faster, but
# otherwise compatible to the naive implementation below)
self._observations_wildcard = WildcardTrieMatcher()
else:
self._observations_wildcard = WildcardMatcher()
# map: observation ID => UriObservation
[docs]
self._observation_id_to_observation = {}
[docs]
def __repr__(self):
return "{}(_ordered={}, _observations_exact={}, _observations_prefix={}, _observations_wildcard={}, _observation_id_to_observation={})".format(
self.__class__.__name__,
self._ordered,
self._observations_exact,
self._observations_prefix,
self._observations_wildcard,
self._observation_id_to_observation,
)
[docs]
def add_observer(self, observer, uri, match="exact", extra=None, observer_extra=None):
"""
Adds a observer to the observation set and returns the respective observation.
:param observer: The observer to add (this can be any opaque object).
:type observer: obj
:param uri: The URI (or URI pattern) to add the observer to add to.
:type uri: unicode
:param match: The matching policy for observing, one of ``"exact"``, ``"prefix"`` or ``"wildcard"``.
:type match: unicode
:returns: A tuple ``(observation, was_already_observed, was_first_observer)``. Here,
``observation`` is an instance of one of ``ExactUriObservation``, ``PrefixUriObservation`` or ``WildcardUriObservation``.
:rtype: tuple
"""
if not isinstance(uri, str):
raise Exception("'uri' should be unicode, not {}".format(type(uri).__name__))
is_first_observer = False
if match == "exact":
# if the exact-matching URI isn't in our map, create a new observation
#
if uri not in self._observations_exact:
self.create_observation(uri, match, extra)
is_first_observer = True
# get the observation
#
observation = self._observations_exact[uri]
elif match == "prefix":
# if the prefix-matching URI isn't in our map, create a new observation
#
if uri not in self._observations_prefix:
self.create_observation(uri, match, extra)
is_first_observer = True
# get the observation
#
observation = self._observations_prefix[uri]
elif match == "wildcard":
# if the wildcard-matching URI isn't in our map, create a new observation
#
if uri not in self._observations_wildcard:
self.create_observation(uri, match, extra)
is_first_observer = True
# get the observation
#
observation = self._observations_wildcard[uri]
else:
raise Exception("invalid match strategy '{}'".format(match))
# add observer if not already in observation
#
if observer not in observation.observers:
was_already_observed = False
# add the observer to the set of observers sitting on the observation
observation.observers.add(observer)
# if there is observer-specific extra data, store it
if observer_extra:
observation.observers_extra[observer] = observer_extra
else:
was_already_observed = True
return observation, was_already_observed, is_first_observer
[docs]
def get_observation(self, uri, match="exact"):
"""
Get a observation (if any) for given URI and match policy.
:param uri: The URI (or URI pattern) to get the observation for.
:type uri: unicode
:param match: The matching policy for observation to retrieve, one of ``"exact"``, ``"prefix"`` or ``"wildcard"``.
:type match: unicode
:returns: The observation (instance of one of ``ExactUriObservation``, ``PrefixUriObservation`` or ``WildcardUriObservation``)
or ``None``.
:rtype: obj or None
"""
if not isinstance(uri, str):
raise Exception("'uri' should be unicode, not {}".format(type(uri).__name__))
if match == "exact":
return self._observations_exact.get(uri, None)
elif match == "prefix":
return self._observations_prefix.get(uri, None)
elif match == "wildcard":
return self._observations_wildcard.get(uri, None)
else:
raise Exception("invalid match strategy '{}'".format(match))
[docs]
def match_observations(self, uri):
"""
Returns the observations matching the given URI. This is the core method called
by a broker to actually dispatch events.
:param uri: The URI to match.
:type uri: unicode
:returns: A list of observations matching the URI. This is a list of instance of
one of ``ExactUriObservation``, ``PrefixUriObservation`` or ``WildcardUriObservation``.
:rtype: list
"""
observations = []
if not isinstance(uri, str):
raise Exception("'uri' should be unicode, not {}".format(type(uri).__name__))
if uri in self._observations_exact:
observations.append(self._observations_exact[uri])
for observation in self._observations_prefix.iter_prefix_values(uri):
observations.append(observation)
for observation in self._observations_wildcard.iter_matches(uri):
observations.append(observation)
return observations
[docs]
def best_matching_observation(self, uri):
"""
Returns the observation that best matches the given URI. This is the core method called
by a dealer to actually forward calls.
:param uri: The URI to match.
:type uri: unicode
:returns: The observation best matching the URI. This is an instance of
``ExactUriObservation``, ``PrefixUriObservation`` or ``WildcardUriObservation`` or ``None``.
:rtype: obj or None
"""
if not isinstance(uri, str):
raise Exception("'uri' should be unicode, not {}".format(type(uri).__name__))
# a exact matching observation is always "best", if any
if uri in self._observations_exact:
return self._observations_exact[uri]
# "second best" is the longest prefix-matching observation, if any
# FIXME: do we want this to take precedence over _any_ wildcard (see below)?
try:
return self._observations_prefix.longest_prefix_value(uri)
except KeyError:
# workaround because of https://bitbucket.org/gsakkis/pytrie/issues/4/string-keys-of-zero-length-are-not
if "" in self._observations_prefix:
return self._observations_prefix[""]
# FIXME: for wildcard observations, when there are multiple matching, we'd
# like to deterministically select the "most selective one"
# We first need a definition of "most selective", and then we need to implement
# this here.
for observation in self._observations_wildcard.iter_matches(uri):
return observation
[docs]
def get_observation_by_id(self, id):
"""
Get a observation by ID.
:param id: The ID of the observation to retrieve.
:type id: int
:returns: The observation for the given ID or ``None``.
:rtype: obj or None
"""
return self._observation_id_to_observation.get(id, None)
[docs]
def create_observation(self, uri, match="exact", extra=None):
"""
Create an observation with no observers.
:param uri: The URI (or URI pattern) to get the observation for.
:type uri: unicode
:param match: The matching policy for observation to retrieve, one of ``"exact"``, ``"prefix"`` or ``"wildcard"``.
:type match: unicode
:returns: The observation (instance of one of ``ExactUriObservation``, ``PrefixUriObservation`` or ``WildcardUriObservation``).
:rtype: obj
"""
if match == "exact":
observation = ExactUriObservation(uri, ordered=self._ordered, extra=extra)
self._observations_exact[uri] = observation
elif match == "prefix":
observation = PrefixUriObservation(uri, ordered=self._ordered, extra=extra)
self._observations_prefix[uri] = observation
elif match == "wildcard":
observation = WildcardUriObservation(uri, ordered=self._ordered, extra=extra)
self._observations_wildcard[uri] = observation
# note observation in observation ID map
#
self._observation_id_to_observation[observation.id] = observation
return observation
[docs]
def drop_observer(self, observer, observation):
"""
Drop a observer from a observation.
:param observer: The observer to drop from the given observation.
:type observer: obj
:param observation: The observation from which to drop the observer. An instance
of ``ExactUriObservation``, ``PrefixUriObservation`` or ``WildcardUriObservation`` previously
created and handed out by this observation map.
:type observation: obj
:param delete: Whether or not to delete the observation if they are the last observer.
:type delete: bool
:returns: A tuple ``(was_observed, was_last_observer)``.
:rtype: tuple
"""
was_last_observer = False
if observer in observation.observers:
was_observed = True
# remove observer from observation
#
observation.observers.discard(observer)
# discard observer-level extra data (if any)
#
if observer in observation.observers_extra:
del observation.observers_extra[observer]
# no more observers on this observation!
#
if not observation.observers:
was_last_observer = True
else:
# observer wasn't on this observation
was_observed = False
return was_observed, was_last_observer
[docs]
def delete_observation(self, observation):
"""
Delete the observation from the map.
:param observation: The observation which to remove from the map. An instance
of ``ExactUriObservation``, ``PrefixUriObservation`` or ``WildcardUriObservation`` previously
created and handed out by this observation map.
:type observation: obj
:rtype: None
"""
if observation.observers:
raise ValueError("Can't delete an observation with current observers.")
if observation.match == "exact":
del self._observations_exact[observation.uri]
elif observation.match == "prefix":
del self._observations_prefix[observation.uri]
elif observation.match == "wildcard":
del self._observations_wildcard[observation.uri]
else:
# should not arrive here
raise Exception("logic error")
del self._observation_id_to_observation[observation.id]