Source code for crossbar.router.realmstore

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

from collections import deque
from typing import Any, Dict, List, Optional

from autobahn.util import hltype, hlval
from autobahn.wamp.interfaces import ISession
from autobahn.wamp.message import Publish
from autobahn.wamp.types import CloseDetails, SessionDetails
from txaio import (
    make_logger,
    time_ns,
    use_twisted,  # noqa
)

from crossbar.interfaces import IRealmStore
from crossbar.router.observation import UriObservationMap

__all__ = (
    "RealmStoreMemory",
    "QueuedCall",
)


[docs] class QueuedCall(object):
[docs] __slots__ = ("session", "call", "registration", "authorization")
def __init__(self, session, call, registration, authorization):
[docs] self.session = session
[docs] self.call = call
[docs] self.registration = registration
[docs] self.authorization = authorization
[docs] class RealmStoreMemory(object): """ Memory-backed realm store. """
[docs] log = make_logger()
[docs] STORE_TYPE = "memory"
[docs] GLOBAL_HISTORY_LIMIT = 100
""" The global history limit, in case not overridden. """ def __init__(self, personality, factory, config): """ See the example here: https://github.com/crossbario/crossbar-examples/tree/master/scaling-microservices/queued .. code-block:: json "store": { "type": "memory", "limit": 1000, // global default for limit on call queues "call-queue": [ { "uri": "com.example.compute", "match": "exact", "limit": 10000 // procedure specific call queue limit } ] } """ from twisted.internet import reactor
[docs] self._reactor = reactor
[docs] self._personality = personality
[docs] self._factory = factory
[docs] self._config = config
[docs] self._type = self._config.get("type", None)
assert self._type == self.STORE_TYPE # limit to event history per subscription
[docs] self._limit = self._config.get("limit", self.GLOBAL_HISTORY_LIMIT)
# map of publication ID -> event dict
[docs] self._event_store = {}
# map of publication ID -> set of subscription IDs
[docs] self._event_subscriptions = {}
# map of subscription ID -> (limit, deque(of publication IDs))
[docs] self._event_history = {}
# map: registration.id -> deque( (session, call, registration, authorization) )
[docs] self._queued_calls = {}
[docs] self._running = False
self.log.info( '{func} realm store initialized (type="{stype}")', stype=hlval(self._type), func=hltype(self.__init__) )
[docs] def type(self) -> str: """ Implements :meth:`crossbar._interfaces.IRealmStore.type` """ return self._type
[docs] def is_running(self) -> bool: """ Implements :meth:`crossbar._interfaces.IRealmStore.is_running` """ return self._running
[docs] def start(self): """ Implements :meth:`crossbar._interfaces.IRealmStore.start` """ if self._running: raise RuntimeError("store is already running") else: self.log.info( '{func} starting realm store type="{stype}"', func=hltype(self.start), stype=hlval(self._type), ) # currently nothing to do in stores of type "memory" self._running = True self.log.info("{func} realm store ready!", func=hltype(self.start))
[docs] def stop(self): """ Implements :meth:`crossbar._interfaces.IRealmStore.stop` """ if not self._running: raise RuntimeError("store is not running") else: self.log.info("{func} stopping realm store", func=hltype(self.start)) # currently nothing to do in stores of type "memory" self._running = False
[docs] def store_session_joined(self, session: ISession, details: SessionDetails): """ Implements :meth:`crossbar._interfaces.IRealmStore.store_session_joined` """ self.log.info( "{func} new session joined session={session}, details={details}", func=hltype(self.store_session_joined), session=session, details=details, )
[docs] def store_session_left(self, session: ISession, details: CloseDetails): """ Implements :meth:`crossbar._interfaces.IRealmStore.store_session_left` """ self.log.info( "{func} session left session={session}, details={details}", func=hltype(self.store_session_left), session=session, details=details, )
[docs] def attach_subscription_map(self, subscription_map: UriObservationMap): """ Implements :meth:`crossbar._interfaces.IRealmStore.attach_subscription_map` """ for sub in self._config.get("event-history", []): uri = sub["uri"] match = sub.get("match", "exact") observation, was_already_observed, was_first_observer = subscription_map.add_observer( self, uri=uri, match=match ) subscription_id = observation.id # for in-memory history, we just use a double-ended queue self._event_history[subscription_id] = (sub.get("limit", self._limit), deque())
[docs] def store_event(self, session: ISession, publication_id: int, publish: Publish): """ Implements :meth:`crossbar._interfaces.IRealmStore.store_event` """ assert publication_id not in self._event_store evt = { "time_ns": time_ns(), "realm": session._realm, "session_id": session._session_id, "authid": session._authid, "authrole": session._authrole, "publication": publication_id, "topic": publish.topic, "args": publish.args, "kwargs": publish.kwargs, } self._event_store[publication_id] = evt self.log.debug( "Event {publication_id} stored in {store_type}-store", store_type=self.STORE_TYPE, publication_id=publication_id, )
[docs] def store_event_history(self, publication_id: int, subscription_id: int, receiver: ISession): """ Implements :meth:`crossbar._interfaces.IRealmStore.store_event_history` """ # assert(publication_id in self._event_store) # assert(subscription_id in self._event_history) if publication_id not in self._event_store: self.log.warn( "INTERNAL WARNING: event for publication {publication_id} not in event store", publication_id=publication_id, ) if subscription_id not in self._event_history: self.log.warn( "INTERNAL WARNING: subscription {subscription_id} for publication {publication_id} not in event store", subscription_id=subscription_id, publication_id=publication_id, ) return limit, history = self._event_history[subscription_id] # append event to history history.append(publication_id) if publication_id not in self._event_subscriptions: self._event_subscriptions[publication_id] = set() self._event_subscriptions[publication_id].add(subscription_id) self.log.debug( "Event {publication_id} history stored in {store_type}-store for subscription {subscription_id}", store_type=self.STORE_TYPE, publication_id=publication_id, subscription_id=subscription_id, ) # purge history if over limit if len(history) > limit: # remove leftmost event from history purged_publication_id = history.popleft() # remove the purged publication from event subscriptions self._event_subscriptions[purged_publication_id].remove(subscription_id) self.log.debug( "Event {publication_id} purged from history for subscription {subscription_id}", publication_id=purged_publication_id, subscription_id=subscription_id, ) # if no more event subscriptions exist for publication, remove that too if not self._event_subscriptions[purged_publication_id]: del self._event_subscriptions[purged_publication_id] del self._event_store[purged_publication_id] self.log.debug("Event {publication_id} purged completey", publication_id=purged_publication_id)
[docs] def get_events(self, subscription_id: int, limit: Optional[int] = None): """ Implements :meth:`crossbar._interfaces.IRealmStore.get_events` """ if subscription_id not in self._event_history: return None else: _, history = self._event_history[subscription_id] # at most "limit" events in reverse chronological order res = [] i = -1 if limit is None or limit > len(history): limit = len(history) for _ in range(limit): publication_id = history[i] res.append(self._event_store[publication_id]) i -= 1 return res
[docs] def get_event_history(self, subscription_id: int, from_ts: int, until_ts: int) -> Optional[List[Dict[str, Any]]]: """ Implements :meth:`crossbar._interfaces.IRealmStore.get_event_history` """ raise Exception("not implemented")
[docs] def maybe_queue_call(self, session, call, registration, authorization): """ Implements :meth:`crossbar._interfaces.IRealmStore.maybe_queue_call` """ # FIXME: match this against the config, not just plain accept queueing! if registration.id not in self._queued_calls: self._queued_calls[registration.id] = deque() self._queued_calls[registration.id].append(QueuedCall(session, call, registration, authorization)) return True
[docs] def get_queued_call(self, registration): """ Implements :meth:`crossbar._interfaces.IRealmStore.get_queued_call` """ if registration.id in self._queued_calls and self._queued_calls[registration.id]: return self._queued_calls[registration.id][0]
[docs] def pop_queued_call(self, registration): """ Implements :meth:`crossbar._interfaces.IRealmStore.pop_queued_call` """ if registration.id in self._queued_calls and self._queued_calls[registration.id]: return self._queued_calls[registration.id].popleft()
IRealmStore.register(RealmStoreMemory)