Source code for crossbar.router.wildcard

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

__all__ = ("WildcardMatcher", "WildcardTrieMatcher")


class _Node(dict):
    __slots__ = ("value",)


[docs] class WildcardTrieMatcher(object): def __init__(self):
[docs] self._root = _Node()
[docs] self._values = set()
[docs] def __setitem__(self, key, value): node = self._root for sym in key.split("."): node = node.setdefault(sym, _Node()) node.value = value self._values.add(value)
[docs] def __getitem__(self, key): node = self._root try: for sym in key.split("."): node = node[sym] return node.value except (KeyError, AttributeError): raise KeyError(key)
[docs] def __delitem__(self, key): lst = [] node = self._root try: for k in key.split("."): lst.append((node, k)) node = node[k] self._values.discard(node.value) del node.value except (KeyError, AttributeError): raise KeyError(key) else: for parent, k in reversed(lst): if node or hasattr(node, "value"): break del parent[k] node = parent
[docs] def __contains__(self, key): try: self[key] return True except KeyError: return False
[docs] def values(self): return list(self._values)
[docs] def get(self, key, default=None): try: return self[key] except KeyError: return default
[docs] def iter_matches(self, key): key = key.split(".") key_len = len(key) def rec(node, i): if i == key_len: if hasattr(node, "value"): yield node.value else: nd = node.get(key[i]) if nd is not None: for e in rec(nd, i + 1): yield e nd = node.get("") # wildcard if nd is not None: for e in rec(nd, i + 1): yield e return rec(self._root, 0)
[docs] class WildcardMatcher(object): def __init__(self):
[docs] self._wildcard = {}
[docs] self._wildcard_patterns = {}
[docs] def __setitem__(self, key, value): self._wildcard[key] = value pattern = tuple([part == "" for part in key.split(".")]) pattern_len = len(pattern) # setup map: pattern length -> patterns if pattern_len not in self._wildcard_patterns: self._wildcard_patterns[pattern_len] = {} # setup map: (pattern length, pattern) -> pattern count if pattern not in self._wildcard_patterns[pattern_len]: self._wildcard_patterns[pattern_len][pattern] = 1 else: self._wildcard_patterns[pattern_len][pattern] += 1
[docs] def __delitem__(self, key): # remove actual observation del self._wildcard[key] pattern = tuple([part == "" for part in key.split(".")]) pattern_len = len(pattern) # cleanup if this was the last observation with given pattern self._wildcard_patterns[pattern_len][pattern] -= 1 if not self._wildcard_patterns[pattern_len][pattern]: del self._wildcard_patterns[pattern_len][pattern] # cleanup if this was the last observation with given pattern length if not self._wildcard_patterns[pattern_len]: del self._wildcard_patterns[pattern_len]
[docs] def __getitem__(self, key): return self._wildcard[key]
[docs] def __contains__(self, key): return key in self._wildcard
[docs] def values(self): return self._wildcard.values()
[docs] def get(self, key, default=None): return self._wildcard.get(key, default)
[docs] def iter_matches(self, key): parts = key.split(".") pattern_len = len(parts) if pattern_len in self._wildcard_patterns: for pattern in self._wildcard_patterns[pattern_len]: patterned = ".".join(["" if pattern[i] else parts[i] for i in range(pattern_len)]) if patterned in self._wildcard: yield self._wildcard[patterned]