###############################################################################
#
# Crossbar.io Master
# Copyright (c) typedef int GmbH. Licensed under EUPLv1.2.
#
###############################################################################
import os
import pprint
import re
import threading
import uuid
from collections import OrderedDict
from datetime import datetime
from pathlib import Path
import cbor2
import humanize
import iso8601
import numpy as np
import treq
import txaio
import zlmdb
from autobahn import util, wamp
from autobahn.twisted.wamp import ApplicationSession
from autobahn.util import utcnow
from autobahn.wamp.request import Registration
from autobahn.wamp.types import CallDetails, PublishOptions, RegisterOptions
from cfxdb.globalschema import GlobalSchema
from cfxdb.log import MWorkerLog
from cfxdb.mrealmschema import MrealmSchema
from cfxdb.usage import MasterNodeUsage
from cfxdb.user import User, UserMrealmRole, UserRole
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThread
from txaio import time_ns
from crossbar._util import hl, hlid, hltype, hlval
from crossbar.common import checkconfig
from crossbar.common.key import _parse_node_key, _read_release_key, _write_node_key
from crossbar.master.mrealm.mrealm import ManagementRealm, MrealmManager, Node
from crossbar.master.node.user import UserManager
from crossbar.node.main import _get_versions
[docs]
_CFC_DOMAIN = "crossbarfabriccenter.domain."
[docs]
_CFC_MREALM = "crossbarfabriccenter.mrealm."
[docs]
_CFC_USER = "crossbarfabriccenter.user."
[docs]
class DomainManager(object):
"""
Global domain backend.
"""
def __init__(self, session, db, schema):
[docs]
self.log = session.log
[docs]
self._schema = schema
[docs]
self._session = session
[docs]
def register(self, session, prefix, options):
return session.register(self, prefix=prefix, options=options)
@wamp.register(None)
[docs]
def get_status(self, details=None):
"""
Get global (domain) realm status.
:procedure: ``crossbarfabriccenter.domain.get_status``
:param details: Call details.
:type details: :class:`autobahn.wamp.types.CallOptions`
:returns: Global system status information.
:rtype: dict
"""
assert isinstance(details, CallDetails)
self.log.debug("{klass}.get_status(details={details})", klass=self.__class__.__name__, details=details)
now = utcnow()
uptime_secs = (iso8601.parse_date(now) - iso8601.parse_date(self._session._started)).total_seconds()
uptime_secs_str = humanize.naturaldelta(uptime_secs)
res = {
"type": "domain",
"realm": self._session._realm,
"now": utcnow(),
"started": self._session._started,
"uptime": uptime_secs_str,
"tick": self._session._tick,
}
return res
@wamp.register(None)
[docs]
def get_version(self, details=None):
"""
Returns CFC software stack version information.
:procedure: ``crossbarfabriccenter.domain.get_version``
:param details: Call details.
:type details: :class:`autobahn.wamp.types.CallOptions`
:return: Information on software stack versions.
:rtype: dict
"""
assert isinstance(details, CallDetails)
self.log.debug("{klass}.get_version(details={details})", klass=self.__class__.__name__, details=details)
# FIXME
from twisted.internet import reactor
versions = _get_versions(reactor)
return versions.marshal()
@wamp.register(None)
[docs]
def get_license(self, details=None):
"""
Returns CFC software stack license information.
:procedure: ``crossbarfabriccenter.domain.get_license``
:param details: Call details.
:type details: :class:`autobahn.wamp.types.CallOptions`
:return: License information, including enabled features and limits.
:rtype: dict
"""
assert isinstance(details, CallDetails)
self.log.debug("{klass}.get_license(details={details})", klass=self.__class__.__name__, details=details)
# FIXME: check blockchain for license information
license = {
"product": "crossbar-free-tier",
"description": "crossbar free usage tier including 5 managed nodes.",
"terms": "https://crossbario.com/license",
"features": {"xbr": False},
"limits": {"concurrent-nodes": 5},
}
return license
[docs]
class DomainController(ApplicationSession):
"""
Main CFC backend for a domain (instance of CFC, all CFC nodes connected to one controller node / database).
There is exactly one instance running per master node of a domain.
"""
[docs]
log = txaio.make_logger()
[docs]
def _initialize(self):
# self.config.controller
# self.config.shared
# comes from "crossbar/master/node/config.json"
assert self.config.extra
cbdir = self.config.extra["cbdir"]
# Release (public) key
self._release_pubkey_hex = _read_release_key()["hex"]
# FIXME: Node key
self._node_key = None
# self._node_key = nacl.signing.SigningKey(self._node_key_hex, encoder=nacl.encoding.HexEncoder)
# self.config.controller.secmod[1]
# self.config.controller.call()
# Metering knobs - FIXME: read and honor all knobs
meterurl = self.config.extra.get("metering", {}).get("submit", {}).get("url", "${CROSSBAR_METERING_URL}")
self._meterurl = checkconfig.maybe_from_env("metering.submit.url", meterurl)
# auto-create default management realm, auto-pair nodes to default mrealm from watching directories
# for new node public keys. configuration example:
#
# "auto_default_mrealm": {
# "enabled": true,
# "watch_to_pair": [],
# "write_pairing_file": true
# }
#
self._auto_default_mrealm = self.config.extra.get("auto_default_mrealm", False)
# create database and attach tables to database slots
#
config = self.config.extra.get("database", {})
dbpath = config.get("path", ".db-controller")
assert isinstance(dbpath, str)
dbpath = os.path.join(cbdir, dbpath)
maxsize = config.get("maxsize", 128 * 2**20)
assert isinstance(maxsize, int)
# allow maxsize 128kiB to 128GiB
assert maxsize >= 128 * 1024 and maxsize <= 128 * 2**30
# setup global database and schema
# self.db = zlmdb.Database(dbpath=dbpath, maxsize=maxsize, readonly=False, sync=True, context=self)
self.db = zlmdb.Database.open(dbpath=dbpath, maxsize=maxsize, readonly=False, sync=True, context=self)
self.db.__enter__()
self.schema = GlobalSchema.attach(self.db)
self.log.info(
"{func} {action} [dbpath={dbpath}, maxsize={maxsize}]",
func=hltype(self._initialize),
action=hlval("global database newly opened", color="green"),
dbpath=hlid(dbpath),
maxsize=hlid(maxsize),
)
[docs]
async def onJoin(self, details):
self._initialize()
# check if magic special builtin "superuser" exists
#
with self.db.begin(write=True) as txn:
self._superuser_authid = "superuser"
self._superuser_oid = self.schema.idx_users_by_email[txn, self._superuser_authid]
if not self._superuser_oid:
self._superuser_oid = uuid.uuid4()
user = User()
user.oid = self._superuser_oid
user.email = self._superuser_authid
user.pubkey = None
user.registered = datetime.utcnow()
self.schema.users[txn, user.oid] = user
self.log.info(
hl(
"SUPERUSER created and stored in database (oid={}, email={})".format(user.oid, user.email),
color="green",
bold=True,
)
)
else:
self.log.info(
hl(
"SUPERUSER already exists in database (oid={})".format(self._superuser_oid),
color="green",
bold=True,
)
)
# check if default management realm exists
#
self._watch_and_pair_lc = None
self._watch_and_pair_count = None
self._default_mrealm_oid = None
self._default_mrealm_enabled = self._auto_default_mrealm.get("enabled", False)
if self._default_mrealm_enabled:
self.log.info("{action}", action=hl("Default management realm enabled", color="green", bold=True))
else:
self.log.info("{action}", action=hl("Default management realm disabled", color="red", bold=True))
if self._default_mrealm_enabled:
# the default management realm (if exists) has always the name "default"!
mrealm_name = "default"
with self.db.begin(write=True) as txn:
self._default_mrealm_oid = self.schema.idx_mrealms_by_name[txn, mrealm_name]
if self._default_mrealm_oid:
self.log.debug(
"ok, default management realm already exists ({oid})", oid=hlid(self._default_mrealm_oid)
)
# This should not happen - but better verify that the owner of the default management realm
# is the current superuser.
# FIXME: merely logging a warning is likely not enough to address this!
#
mrealm = self.schema.mrealms[txn, self._default_mrealm_oid]
if mrealm.owner != self._superuser_oid:
self.log.warn(
"\n\nWARNING: default management realm not owned by SUPERUSER! [owner={owner}, superuser={superuser}]\n\n",
superuser=hlid(self._superuser_oid),
owner=hlid(mrealm.owner),
)
else:
# the OID of a newly created default management realm varies locally
self._default_mrealm_oid = uuid.uuid4()
new_mrealm = ManagementRealm()
new_mrealm.oid = self._default_mrealm_oid
new_mrealm.label = "default mrealm"
new_mrealm.description = "Default management realm (automatically pre-created)"
new_mrealm.tags = []
new_mrealm.name = mrealm_name
new_mrealm.created = datetime.utcnow()
new_mrealm.owner = self._superuser_oid
new_mrealm.cf_router = "cfrouter1"
new_mrealm.cf_container = "cfcontainer1"
# store new management realm
self.schema.mrealms[txn, new_mrealm.oid] = new_mrealm
# store roles for user that created the management realm
roles = UserMrealmRole([UserRole.OWNER, UserRole.ADMIN, UserRole.USER, UserRole.GUEST])
self.schema.users_mrealm_roles[txn, (self._superuser_oid, new_mrealm.oid)] = roles
self.log.info(
"{action} [oid={oid}]",
action=hl("Default management realm created", color="red", bold=True),
oid=hlid(new_mrealm.oid),
)
self._watch_to_pair = self._auto_default_mrealm.get("watch_to_pair", None)
if self._watch_to_pair:
self.log.info("{action}", action=hl("Watch-to-pair enabled", color="green", bold=True))
else:
self.log.info("{action}", action=hl("Watch-to-pair disabled", color="red", bold=True))
if self._watch_to_pair:
self._watch_to_pair = checkconfig.maybe_from_env(
"auto_default_mrealm.watch_to_pair", self._watch_to_pair, hide_value=False
)
if self._watch_to_pair:
self._watch_to_pair = Path(self._watch_to_pair)
if self._watch_to_pair and self._watch_to_pair.is_dir():
node_dir_pat = self._auto_default_mrealm.get("watch_to_pair_pattern", None)
if node_dir_pat:
node_dir_pat = re.compile(node_dir_pat)
# FIXME: read from config
follow_symlinks = True
@inlineCallbacks
def watch_and_pair():
self.log.debug(
'{klass}::watch_and_pair[counter={cnt}, watch_to_pair="{watch_to_pair}"]',
cnt=hlval(self._watch_and_pair_count),
watch_to_pair=self._watch_to_pair,
klass=self.__class__.__name__,
)
try:
self._watch_and_pair_count += 1
if node_dir_pat:
node_dirs = [
x.name
for x in os.scandir(self._watch_to_pair)
if x.is_dir(follow_symlinks=follow_symlinks) and node_dir_pat.match(x.name)
]
else:
node_dirs = [
x.name
for x in os.scandir(self._watch_to_pair)
if x.is_dir(follow_symlinks=follow_symlinks)
]
pubkeys = []
for node_dir in node_dirs:
# r = root, d = directories, f = files
for r, d, f in os.walk(os.path.join(self._watch_to_pair, node_dir)):
for file in f:
if file == "key.pub":
node_key_file = os.path.join(r, file)
if os.path.isfile(node_key_file):
node_key_tags = _parse_node_key(node_key_file)
node_key_hex = node_key_tags["public-key-ed25519"]
node_id = node_key_tags.get("node-authid", None)
cluster_ip = node_key_tags.get("node-cluster-ip", None)
pubkeys.append((r, node_key_hex, node_id, cluster_ip))
self.log.debug(
"{klass}::watch_and_pair: found {cnt} directories (matching), with {cntk} node keys scanned ..",
cnt=hl(len(node_dirs)),
cntk=hl(len(pubkeys)),
klass=self.__class__.__name__,
)
if pubkeys:
# determine actually new pubkeys
new_pubkeys = []
with self.db.begin() as txn:
for cbdir, pubkey, node_id, cluster_ip in pubkeys:
node_oid = self.schema.idx_nodes_by_pubkey[txn, pubkey]
if not node_oid:
new_pubkeys.append((cbdir, pubkey, node_id, cluster_ip))
else:
node = self.schema.nodes[txn, node_oid]
self.log.debug(
"{klass}::watch_and_pair: node with pubkey {pubkey} already paired as node {node} to mrealm {mrealm}",
klass=self.__class__.__name__,
pubkey=hlid(pubkey),
node=hlid(node.oid),
mrealm=hlid(node.mrealm_oid),
)
self.log.debug(
'{klass}::watch_and_pair: scanned ({count}) directory "{watch_to_pair}" for node auto-pairing, found {cnt_new_keys} new (in {cnt_keys} total) keys',
count=hlval(self._watch_and_pair_count),
watch_to_pair=hlid(self._watch_to_pair),
cnt_keys=hlval(len(pubkeys)),
cnt_new_keys=hlval(len(new_pubkeys)),
klass=self.__class__.__name__,
)
# store all new pubkeys
for cbdir, pubkey, node_id, cluster_ip in new_pubkeys:
node = Node()
node.oid = uuid.uuid4()
node.pubkey = pubkey
node.cluster_ip = cluster_ip
# auto-pair newly discovered node to the default management realm, and owned by superuser
node.owner_oid = self._superuser_oid
node.mrealm_oid = self._default_mrealm_oid
if node_id:
node.authid = node_id
else:
node.authid = "node-{}".format(str(node.oid)[:8])
node.authextra = {
"node_oid": str(node.oid),
"cluster_ip": cluster_ip,
"mrealm_oid": str(node.mrealm_oid),
}
# store paired node in database
with self.db.begin(write=True) as txn:
self.schema.nodes[txn, node.oid] = node
if self._auto_default_mrealm.get("write_pairing_file", False):
management_url = self._auto_default_mrealm.get("management_url", None)
if management_url:
management_url = checkconfig.maybe_from_env(
"auto_default_mrealm.management_url", management_url, hide_value=False
)
activation_code = None
if self._auto_default_mrealm.get("include_activation_code", False):
activation_code = util.generate_activation_code()
# FIXME: store activation & check later
activation_file = os.path.join(cbdir, "key.activate")
if not os.path.exists(activation_file):
file_tags = OrderedDict(
[
("created-at", utcnow()),
(
"management-url",
management_url or "wss://master.xbr.network/ws",
),
("management-realm", "default"),
("management-realm-oid", str(node.mrealm_oid)),
("node-oid", str(node.oid)),
("node-cluster-ip", node.cluster_ip),
("node-authid", str(node.authid)),
("activation-code", activation_code),
("public-key-ed25519", pubkey),
]
)
file_msg = "Crossbar.io node activation\n\n"
try:
_write_node_key(activation_file, file_tags, file_msg)
except OSError as e:
self.log.warn(
"{klass}::watch_and_pair: failed to write {action} to {activation_file} ({err})",
klass=self.__class__.__name__,
action=hl("Node activation file", color="red", bold=True),
activation_file=hlval(activation_file),
err=str(e),
)
else:
self.log.info(
"{klass}::watch_and_pair: {action} written to {activation_file}",
klass=self.__class__.__name__,
action=hl("Node activation file", color="red", bold=True),
activation_file=hlval(activation_file),
)
else:
self.log.warn(
"{klass}::watch_and_pair: skipped writing {action} to {activation_file} (path already exists)",
klass=self.__class__.__name__,
action=hl("Node activation file", color="red", bold=True),
activation_file=hlval(activation_file),
)
topic = "crossbarfabriccenter.mrealm.on_node_paired"
payload = node.marshal()
yield self.publish(topic, payload, options=PublishOptions(acknowledge=True))
self.log.info(
"{klass}::watch_and_pair: {action} with pubkey={pubkey}, oid={node_oid}, authid={authid} to default management realm {mrealm}!",
klass=self.__class__.__name__,
action=hl("Auto-paired node", color="red", bold=True),
node_oid=hlid(node.oid),
authid=hlid(node.authid),
pubkey=hlval(node.pubkey),
mrealm=hlid(node.mrealm_oid),
)
else:
self.log.debug(
'{klass}::watch_and_pair: scanned ({count}) directory "{watch_to_pair}" for node auto-pairing: no nodes found',
klass=self.__class__.__name__,
count=hlval(self._watch_and_pair_count),
watch_to_pair=hlid(self._watch_to_pair),
)
except:
self.log.failure()
self._watch_and_pair_lc = LoopingCall(watch_and_pair)
self._watch_and_pair_count = 0
self._watch_and_pair_lc.start(10)
else:
if self._watch_to_pair:
self.log.warn(
'skipping to watch "{watch_to_pair}" for node auto-pairing - not a directory!',
watch_to_pair=self._watch_to_pair.absolute(),
)
else:
self.log.warn("skipping to watch for node auto-pairing - no directory configured!")
# initialize management backends
#
self._domain_mgr = DomainManager(self, self.db, self.schema)
self._user_mgr = UserManager(self, self.db, self.schema)
self._mrealm_mgr = MrealmManager(self, self.db, self.schema)
domains = (
(_CFC_DOMAIN, self._domain_mgr.register),
(_CFC_USER, self._user_mgr.register),
(_CFC_MREALM, self._mrealm_mgr.register),
) # yapf: disable
for topic, procedure in domains:
results = await procedure(self, prefix=topic, options=RegisterOptions(details_arg="details"))
for reg in results:
if isinstance(reg, Registration):
self.log.debug("Registered CFC API <{proc}>", proc=reg.procedure)
else:
self.log.error("Error: <{}>".format(reg.value.args[0]))
# start master heartbeat loop ..
#
self._tick = 1
@inlineCallbacks
def tick():
# process regular master activities
started = time_ns()
# watch master database fill grade
dbstats = self.db.stats()
free = dbstats["free"]
used = dbstats["current_size"]
if free < 0.01:
self.log.error(
"Global master database full! Initiating EMERGENCY SHUTDOWN .. [{free}% free]",
free=round(free * 100.0, 2),
)
# if we don't have at least 1% free space, immediately shutdown the whole node so we don't
# run into "db full" later possibly half way through the body of a management procedure
yield self.config.controller.call("crossbar.shutdown")
elif free < 0.1:
self.log.warn(
"Global master database almost full: only {free}% free space left!", free=round(free * 100.0, 2)
)
# >>> BEGIN of master heartbeat loop tasks
# FIXME: tried to open same dbpath "/home/oberstet/scm/typedefint/crossbar-cluster/.recordevolution/master/.crossbar/.db-mrealm-659f476d-c320-48c7-825b-d27efdfde8e8" twice within same process: cannot open database for <zlmdb._database.Database object at 0x7ffa474cf8b0> (PID 98672, Context <crossbar.master.node.controller.DomainController object at 0x7ffa47511f70>), already opened in <zlmdb._database.Database object at 0x7ffa474b6af0> (PID 98672, Context <crossbar.master.node.controller.DomainController object at 0x7ffa47511f70>)
if False:
# 1) aggregate and store usage metering records
cnt_new = None
if True:
try:
cnt_new = yield self._do_metering(started)
except:
self.log.failure()
# 2) submit usage metering records to metering service
if self._meterurl:
if cnt_new:
try:
yield self._submit_metering(started)
except:
self.log.failure()
else:
self.log.warn("Skipping to submit metering records - no metering URL set!")
# >>> END of master heartbeat loop tasks
# publish master heartbeat
ticked = {"now": utcnow(), "tick": self._tick}
yield self.publish("{}on_tick".format(_CFC_DOMAIN), ticked, options=PublishOptions(acknowledge=True))
# master heartbeat loop finished!
duration = int(round((time_ns() - started) / 1000000.0))
if duration > 500:
self.log.warn(
"Master heartbeat loop iteration {tick} finished: excessive run-time of {duration} ms!",
tick=self._tick,
duration=duration,
)
self.log.debug(
"Master heartbeat loop iteration {tick} finished in {duration} ms (database {used} used, {free}% free)",
tick=self._tick,
free=round(free * 100.0, 2),
used=humanize.naturalsize(used),
duration=duration,
)
# set next master heartbeat loop iteration sequence number
self._tick += 1
# FIXME: make this configurable from master node config
master_tick_period = 300
c = LoopingCall(tick)
c.start(master_tick_period)
self.log.debug("Master heartbeat loop started .. [period={period} secs]", period=master_tick_period)
# note status started
#
self._started = utcnow()
self.log.debug('Domain controller ready (realm="{realm}")!', realm=hlid(self._realm))
[docs]
def _first_metering(self, mrealm_id):
"""
Determine timestamp of first node heartbeat from any node of a given management realm.
:param mrealm_id:
:return:
"""
dbpath = os.path.join(self.config.extra["cbdir"], ".db-mrealm-{}".format(mrealm_id))
# db = zlmdb.Database(dbpath=dbpath, readonly=False, context=self)
db = zlmdb.Database.open(dbpath=dbpath, readonly=False, context=self)
schema = MrealmSchema.attach(db)
with self.db.begin() as txn:
with db.begin() as txn2:
for ts, node_id in schema.mnode_logs.select(txn2, reverse=False, return_values=False):
node = self.schema.nodes[txn, node_id]
if node and node.mrealm_oid == mrealm_id:
return ts
[docs]
def _agg_metering_mnode_logs(self, from_ts, until_ts, mrealm_id, by_node=False):
"""
Aggregate raw managed node and worker logs and store usage metering records.
Note: This is run on a background thread!
:param from_ts:
:param until_ts:
:param mrealm_id:
:param by_node:
:return:
"""
dbpath = os.path.join(self.config.extra["cbdir"], ".db-mrealm-{}".format(mrealm_id))
# db = zlmdb.Database(dbpath=dbpath, readonly=False, context=self)
db = zlmdb.Database.open(dbpath=dbpath, readonly=False, context=self)
schema = MrealmSchema.attach(db)
if by_node:
# compute aggregate sum grouped by node_id
res = {}
with db.begin() as txn:
for ts, node_id in schema.mnode_logs.select(
txn,
from_key=(from_ts, uuid.UUID(bytes=b"\x00" * 16)),
to_key=(until_ts, uuid.UUID(bytes=b"\xff" * 16)),
return_values=False,
reverse=False,
):
rec = schema.mnode_logs[txn, (ts, node_id)]
if node_id not in res:
res[node_id] = {
"count": 0,
"nodes": 0,
"routers": 0,
"containers": 0,
"guests": 0,
"proxies": 0,
"marketmakers": 0,
"hostmonitors": 0,
"controllers": 0,
}
res[node_id]["count"] += 1
res[node_id]["nodes"] += rec.period
res[node_id]["routers"] += rec.routers * rec.period
res[node_id]["containers"] += rec.containers * rec.period
res[node_id]["guests"] += rec.guests * rec.period
res[node_id]["proxies"] += rec.proxies * rec.period
res[node_id]["marketmakers"] += rec.marketmakers * rec.period
res[node_id]["hostmonitors"] += rec.hostmonitors * rec.period
res[node_id]["controllers"] += rec.controllers * rec.period
else:
# compute aggregate sum
res = {
"count": 0,
"nodes": 0,
"routers": 0,
"containers": 0,
"guests": 0,
"proxies": 0,
"marketmakers": 0,
"hostmonitors": 0,
"controllers": 0,
}
nodes = set()
with db.begin() as txn:
for ts, node_id in schema.mnode_logs.select(
txn,
from_key=(from_ts, uuid.UUID(bytes=b"\x00" * 16)),
to_key=(until_ts, uuid.UUID(bytes=b"\xff" * 16)),
return_values=False,
reverse=False,
):
rec = schema.mnode_logs[txn, (ts, node_id)]
if node_id not in nodes:
nodes.add(node_id)
res["count"] += 1
res["nodes"] += rec.period
res["routers"] += rec.routers * rec.period
res["containers"] += rec.containers * rec.period
res["guests"] += rec.guests * rec.period
res["proxies"] += rec.proxies * rec.period
res["marketmakers"] += rec.marketmakers * rec.period
res["hostmonitors"] += rec.hostmonitors * rec.period
res["controllers"] += rec.controllers * rec.period
self.log.debug(
" Metering: aggregated node logs metering records on thread {thread_id} [mrealm_id={mrealm_id}, from_ts={from_ts}, until_ts={until_ts}]:\n{res}",
mrealm_id=mrealm_id,
from_ts=from_ts,
until_ts=until_ts,
thread_id=threading.get_ident(),
res=pprint.pformat(res),
)
return res
[docs]
def _agg_metering_mworker_logs(self, from_ts, until_ts, mrealm_id):
"""
Aggregate raw managed node and worker logs and store usage metering records.
Note: This is run on a background thread!
:param from_ts:
:param until_ts:
:param mrealm_id:
:param by_node:
:return:
"""
dbpath = os.path.join(self.config.extra["cbdir"], ".db-mrealm-{}".format(mrealm_id))
# db = zlmdb.Database(dbpath=dbpath, readonly=False, context=self)
db = zlmdb.Database.open(dbpath=dbpath, readonly=False, context=self)
schema = MrealmSchema.attach(db)
# compute aggregate sum
res = {
"count": 0,
"total": 0,
"controllers": 0,
"hostmonitors": 0,
"routers": 0,
"containers": 0,
"guests": 0,
"proxies": 0,
"marketmakers": 0,
"sessions": 0,
"msgs_call": 0,
"msgs_yield": 0,
"msgs_invocation": 0,
"msgs_result": 0,
"msgs_error": 0,
"msgs_publish": 0,
"msgs_published": 0,
"msgs_event": 0,
"msgs_register": 0,
"msgs_registered": 0,
"msgs_subscribe": 0,
"msgs_subscribed": 0,
}
wres = {}
nodes = set()
with db.begin() as txn:
# go over all worker heartbeat records in given time interval ..
for ts, node_id, worker_id in schema.mworker_logs.select(
txn,
from_key=(from_ts, uuid.UUID(bytes=b"\x00" * 16), ""),
to_key=(until_ts, uuid.UUID(bytes=b"\xff" * 16), ""),
return_values=False,
reverse=False,
):
rec = schema.mworker_logs[txn, (ts, node_id, worker_id)]
assert rec.period
# set of nodes we encountered in heartbeats in the time interval
if node_id not in nodes:
nodes.add(node_id)
# type name (str) of the worker
worker_type = MWorkerLog.WORKER_TYPENAMES[rec.type]
# increment used "worker seconds"
# FIXME: cleanup this hack ..
if worker_type.endswith("y"):
# proxy -> proxies
res["{}ies".format(worker_type[:-1])] += rec.period
else:
res["{}s".format(worker_type)] += rec.period
# increment processed records
res["count"] += 1
# for workers of type "router", we compute additional statistics:
if worker_type == "router":
wkey = (node_id, worker_id)
if wkey not in wres:
wres[wkey] = {
# session seconds
"sessions": 0,
# minimum number of WAMP messages per type
"msgs_call_min": 0,
"msgs_yield_min": 0,
"msgs_invocation_min": 0,
"msgs_result_min": 0,
"msgs_error_min": 0,
"msgs_publish_min": 0,
"msgs_published_min": 0,
"msgs_event_min": 0,
"msgs_register_min": 0,
"msgs_registered_min": 0,
"msgs_subscribe_min": 0,
"msgs_subscribed_min": 0,
# maximum number of WAMP messages per type
"msgs_call_max": 0,
"msgs_yield_max": 0,
"msgs_invocation_max": 0,
"msgs_result_max": 0,
"msgs_error_max": 0,
"msgs_publish_max": 0,
"msgs_published_max": 0,
"msgs_event_max": 0,
"msgs_register_max": 0,
"msgs_registered_max": 0,
"msgs_subscribe_max": 0,
"msgs_subscribed_max": 0,
}
# increment used "session seconds" (seconds of connected clients)
wres[wkey]["sessions"] += rec.router_sessions * rec.period
if rec.recv_call > wres[wkey]["msgs_call_max"]:
wres[wkey]["msgs_call_max"] = rec.recv_call
if not wres[wkey]["msgs_call_min"] or rec.recv_call < wres[wkey]["msgs_call_min"]:
wres[wkey]["msgs_call_min"] = rec.recv_call
if rec.recv_yield > wres[wkey]["msgs_yield_max"]:
wres[wkey]["msgs_yield_max"] = rec.recv_yield
if not wres[wkey]["msgs_yield_min"] or rec.recv_yield < wres[wkey]["msgs_yield_min"]:
wres[wkey]["msgs_yield_min"] = rec.recv_yield
if rec.sent_invocation > wres[wkey]["msgs_invocation_max"]:
wres[wkey]["msgs_invocation_max"] = rec.sent_invocation
if (
not wres[wkey]["msgs_invocation_min"]
or rec.sent_invocation < wres[wkey]["msgs_invocation_min"]
):
wres[wkey]["msgs_invocation_min"] = rec.sent_invocation
# FIXME
# if rec.sent_error > res[wkey]['msgs_error_max']:
# res[wkey]['msgs_error_max'] = rec.sent_error
# if not res[wkey]['msgs_error_min'] or rec.sent_error < res[wkey]['msgs_error_min']:
# res[wkey]['msgs_error_min'] = rec.sent_error
if rec.sent_result > wres[wkey]["msgs_result_max"]:
wres[wkey]["msgs_result_max"] = rec.sent_result
if not wres[wkey]["msgs_result_min"] or rec.sent_result < wres[wkey]["msgs_result_min"]:
wres[wkey]["msgs_result_min"] = rec.sent_result
if rec.recv_publish > wres[wkey]["msgs_publish_max"]:
wres[wkey]["msgs_publish_max"] = rec.recv_publish
if not wres[wkey]["msgs_publish_min"] or rec.recv_publish < wres[wkey]["msgs_publish_min"]:
wres[wkey]["msgs_publish_min"] = rec.recv_publish
if rec.sent_published > wres[wkey]["msgs_published_max"]:
wres[wkey]["msgs_published_max"] = rec.sent_published
if not wres[wkey]["msgs_published_min"] or rec.sent_published < wres[wkey]["msgs_published_min"]:
wres[wkey]["msgs_published_min"] = rec.sent_published
if rec.sent_event > wres[wkey]["msgs_event_max"]:
wres[wkey]["msgs_event_max"] = rec.sent_event
if not wres[wkey]["msgs_event_min"] or rec.sent_event < wres[wkey]["msgs_event_min"]:
wres[wkey]["msgs_event_min"] = rec.sent_event
if rec.recv_register > wres[wkey]["msgs_register_max"]:
wres[wkey]["msgs_register_max"] = rec.recv_register
if not wres[wkey]["msgs_register_min"] or rec.recv_register < wres[wkey]["msgs_register_min"]:
wres[wkey]["msgs_register_min"] = rec.recv_register
if rec.sent_registered > wres[wkey]["msgs_registered_max"]:
wres[wkey]["msgs_registered_max"] = rec.sent_registered
if (
not wres[wkey]["msgs_registered_min"]
or rec.sent_registered < wres[wkey]["msgs_registered_min"]
):
wres[wkey]["msgs_registered_min"] = rec.sent_registered
if rec.recv_subscribe > wres[wkey]["msgs_subscribe_max"]:
wres[wkey]["msgs_subscribe_max"] = rec.recv_subscribe
if not wres[wkey]["msgs_subscribe_min"] or rec.recv_subscribe < wres[wkey]["msgs_subscribe_min"]:
wres[wkey]["msgs_subscribe_min"] = rec.recv_subscribe
if rec.sent_subscribed > wres[wkey]["msgs_subscribed_max"]:
wres[wkey]["msgs_subscribed_max"] = rec.sent_subscribed
if (
not wres[wkey]["msgs_subscribed_min"]
or rec.sent_subscribed < wres[wkey]["msgs_subscribed_min"]
):
wres[wkey]["msgs_subscribed_min"] = rec.sent_subscribed
res["nodes"] = len(nodes)
for wkey in wres:
res["sessions"] += wres[wkey]["sessions"]
res["msgs_call"] += wres[wkey]["msgs_call_max"] - wres[wkey]["msgs_call_min"]
res["msgs_yield"] += wres[wkey]["msgs_yield_max"] - wres[wkey]["msgs_yield_min"]
res["msgs_invocation"] += wres[wkey]["msgs_invocation_max"] - wres[wkey]["msgs_invocation_min"]
res["msgs_result"] += wres[wkey]["msgs_result_max"] - wres[wkey]["msgs_result_min"]
res["msgs_error"] += wres[wkey]["msgs_error_max"] - wres[wkey]["msgs_error_min"]
res["msgs_publish"] += wres[wkey]["msgs_publish_max"] - wres[wkey]["msgs_publish_min"]
res["msgs_published"] += wres[wkey]["msgs_published_max"] - wres[wkey]["msgs_published_min"]
res["msgs_event"] += wres[wkey]["msgs_event_max"] - wres[wkey]["msgs_event_min"]
res["msgs_register"] += wres[wkey]["msgs_register_max"] - wres[wkey]["msgs_register_min"]
res["msgs_registered"] += wres[wkey]["msgs_registered_max"] - wres[wkey]["msgs_registered_min"]
res["msgs_subscribe"] += wres[wkey]["msgs_subscribe_max"] - wres[wkey]["msgs_subscribe_min"]
res["msgs_subscribed"] += wres[wkey]["msgs_subscribed_max"] - wres[wkey]["msgs_subscribed_min"]
self.log.debug(
" Metering: aggregated {cnt_records} records from mworker_logs: {cnt_nodes} nodes, {cnt} metering records, thread {thread_id} [mrealm_id={mrealm_id}, from_ts={from_ts}, until_ts={until_ts}]",
cnt=len(res),
cnt_nodes=res["nodes"],
cnt_records=res["count"],
mrealm_id=mrealm_id,
from_ts=from_ts,
until_ts=until_ts,
thread_id=threading.get_ident(),
)
return res
@inlineCallbacks
[docs]
def _do_metering(self, started):
self.log.debug(
'Usage metering: aggregating heartbeat records .. [started="{started}", thread_id={thread_id}]',
started=np.datetime64(started, "ns"),
thread_id=threading.get_ident(),
)
# FIXME: make this tunable from the master node config
agg_mins = 5
# determine intervals of (from_ts, until_ts) for which to compute usage data from aggregate raw heartbeat data
intervals = []
mrealm_ids = []
with self.db.begin() as txn:
mrealm_ids.extend(self.schema.mrealms.select(txn, return_values=False))
last_ts = None
for ts, _ in self.schema.usage.select(txn, reverse=True, limit=1, return_values=False):
last_ts = ts
if not last_ts:
for mrealm_id in mrealm_ids:
ts = self._first_metering(mrealm_id)
if not last_ts or ts < last_ts:
last_ts = ts
if not last_ts:
last_ts = np.datetime64(np.datetime64(time_ns(), "ns") - np.timedelta64(agg_mins, "m"), "ns")
last_ts = np.datetime64(last_ts.astype("datetime64[m]"), "ns")
self.log.debug('Usage metering: first metering timestamp set to "{last_ts}"', last_ts=last_ts)
else:
self.log.debug('Usage metering: last metering timestamp stored is "{last_ts}"', last_ts=last_ts)
until_ts = np.datetime64(last_ts + np.timedelta64(agg_mins, "m"), "ns")
while until_ts < np.datetime64(time_ns(), "ns"):
intervals.append((last_ts, until_ts))
self.log.debug(
'Usage metering: aggregation interval ("{last_ts}", "{until_ts}") appended',
last_ts=last_ts,
until_ts=until_ts,
)
last_ts = until_ts
until_ts = np.datetime64(last_ts + np.timedelta64(agg_mins, "m"), "ns")
cnt_new = 0
# iterate over intervals to be aggregated ..
if intervals:
self.log.debug(
"Usage metering: {cnt_intervals} intervals collected - now aggregating on background threads ..",
cnt_intervals=len(intervals),
)
# iterate over intervals ..
for from_ts, until_ts in intervals:
# .. and all management realms ..
# FIXME: aggregate all mrealms for a given interval in one go
for mrealm_id in mrealm_ids:
try:
# aggregate all _worker_ heartbeat data for interval (from_ts, until_ts) and mrealm_id
mrealm_res = yield deferToThread(self._agg_metering_mworker_logs, from_ts, until_ts, mrealm_id)
self.log.debug(
"Usage metering: worker aggregate result\n{mrealm_node_res}",
mrealm_node_res=pprint.pformat(mrealm_res),
)
# aggregate all _node_ heartbeat data for interval (from_ts, until_ts) and mrealm_id
mrealm_node_res = yield deferToThread(
self._agg_metering_mnode_logs, from_ts, until_ts, mrealm_id
)
self.log.debug(
"Usage metering: node aggregate result\n{mrealm_node_res}",
mrealm_node_res=pprint.pformat(mrealm_node_res),
)
except:
self.log.failure()
else:
# overwrite "nodes" with what the node-based aggregate says (which is correct):
mrealm_res["nodes"] = mrealm_node_res["nodes"]
for key in ["routers", "guests", "containers", "proxies", "marketmakers", "controllers"]:
if mrealm_res[key] != mrealm_node_res[key]:
self.log.warn(
'Usage metering: node/worker aggregate for worker type "{key}" differ on worker seconds: {worker_res} != {node_res} (worker-based / node-based aggregate)',
key=key,
worker_res=mrealm_res[key],
node_res=mrealm_node_res[key],
)
mrealm_res["timestamp"] = int(until_ts)
mrealm_res["timestamp_from"] = int(from_ts)
mrealm_res["mrealm_id"] = str(mrealm_id)
mrealm_res["seq"] = self._tick
# FIXME: set master (!) node public key
# mrealm_res['pubkey'] = self._node_key.verify_key.encode(encoder=nacl.encoding.RawEncoder)
# usage records start in status "RECEIVED"
mrealm_res["status"] = 1
mrealm_res["processed"] = time_ns()
# not used here
# mrealm_res['sent'] = None
# mrealm_res['status_message'] = None
# parse the raw dict into object
usage = MasterNodeUsage.parse(mrealm_res)
# finally, store new usage record in database
with self.db.begin(write=True) as txn:
self.schema.usage[txn, (until_ts, mrealm_id)] = usage
# publish usage metering event
topic = "{}on_meter_usage".format(_CFC_DOMAIN)
yield self.publish(
topic,
str(mrealm_id),
str(from_ts),
str(until_ts),
usage.marshal(),
options=PublishOptions(acknowledge=True),
)
self.log.debug(
'Usage metering: aggregated and stored period from {from_ts} to {until_ts} ({duration}) usage metering data for mrealm "{mrealm_id}"',
mrealm_id=mrealm_id,
duration=str(np.timedelta64(until_ts - from_ts, "s")),
from_ts=from_ts,
until_ts=until_ts,
)
self.log.debug("Usage metering data:\n{usage}", usage=pprint.pformat(usage.marshal()))
cnt_new += 1
else:
self.log.debug("Usage metering: no new intervals to aggregate.")
self.log.debug("Usage metering: finished aggregating [{cnt_new} intervals stored]", cnt_new=cnt_new)
return cnt_new
@inlineCallbacks
[docs]
def _submit_metering(self, started, filter_status=[1], limit=None):
self.log.debug(
'Usage metering: submitting metering records .. [started="{started}"]',
started=np.datetime64(started, "ns"),
)
# collects keys in table "schema.usage" for metering records to be submitted
keys = []
# we only submit meterings up to 24h old
from_ts = np.datetime64(np.datetime64(time_ns(), "ns") - np.timedelta64(24, "h"), "ns")
from_key = (from_ts, uuid.UUID(bytes=b"\x00" * 16))
with self.db.begin() as txn:
for rec in self.schema.usage.select(txn, reverse=False, return_keys=False, from_key=from_key, limit=limit):
if rec.status in filter_status:
keys.append((rec.timestamp, rec.mrealm_id))
tried = 0
success = 0
failed = 0
# submit each metering record (here, sequentially)
for key in keys:
# fetch the record we want to submit
with self.db.begin() as txn:
rec = self.schema.usage[txn, key]
self.log.debug(
'Usage metering: submitting metering record to "{url}"\n{rec}',
rec=pprint.pformat(rec.marshal()),
url=self._meterurl,
)
# serialize metering data
data = cbor2.dumps(rec.marshal())
# FIXME: the master node public key
# verify_key = self._node_key.verify_key.encode(encoder=nacl.encoding.RawEncoder)
#
# # sign metering data with master node (private) key
#
# # POST message body is concatenation of verify key and signed message:
# data = verify_key + signed_msg
#
# self.log.debug('HTTP/POST: verify_key={lvk} data={ld} raw_data={lrd} signed_msg={lsm}',
# lvk=len(verify_key),
# ld=len(data),
# lrd=len(raw_data),
# lsm=len(signed_msg))
tried += 1
metering_id = None
try:
# issue the actual (outgoing) HTTP/POST request (with a 5s timeout) ..
response = yield treq.post(self._meterurl, data=data, timeout=5)
# .. and receive response body
rdata = yield treq.content(response)
if response.code != 200:
raise Exception(
'metering denied by metering service: HTTP response {}, "{}")'.format(response.code, rdata)
)
except Exception as e:
# eg "twisted.internet.error.ConnectionRefusedError"
# self.log.failure()
rec.status = 3
rec.status_message = "failed to submit metering record: {}".format(e)
rec.processed = np.datetime64(time_ns(), "ns")
failed += 1
self.log.warn(
'Usage metering: failed to submit metering record for "{timestamp}" - "{errmsg}"',
timestamp=rec.timestamp,
errmsg=str(e),
)
else:
try:
metering_id = uuid.UUID(bytes=rdata)
except Exception as e:
rec.status = 3
rec.status_message = "invalid response from metering service: {}".format(e)
rec.processed = np.datetime64(time_ns(), "ns")
failed += 1
self.log.log_failure()
else:
rec.status = 2
rec.processed = np.datetime64(time_ns(), "ns")
rec.metering_id = metering_id
success += 1
self.log.debug(
'Usage metering: metering record for "{timestamp}" successfully submitted [meterurl="{meterurl}", metering_id="{metering_id}"]',
meterurl=self._meterurl,
timestamp=rec.timestamp,
metering_id=metering_id,
response_length=len(rdata),
)
with self.db.begin(write=True) as txn:
self.schema.usage[txn, key] = rec
self.log.debug(
'Usage metering: metering record for "{timestamp}" processed with new status {status} [metering_id="{metering_id}"].',
timestamp=rec.timestamp,
metering_id=metering_id,
status=rec.status,
)
self.log.debug(
"Usage metering: finished submitting metering records [tried={tried}, success={success}, failed={failed}]",
tried=tried,
success=success,
failed=failed,
)