##############################################################################
#
# Crossbar.io
# Copyright (C) typedef int GmbH. All rights reserved.
#
##############################################################################
import os
import threading
import time
import psutil
import six
from autobahn import wamp
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.types import PublishOptions
from twisted.internet.defer import DeferredList, inlineCallbacks
from twisted.internet.threads import deferToThread
from txaio import make_logger, time_ns
from crossbar.edge.worker.monitor import MONITORS
from crossbar.node.worker import NativeWorkerProcess
from crossbar.worker.controller import WorkerController
__all__ = ("HostMonitor", "HostMonitorProcess")
[docs]
class HostMonitorProcess(NativeWorkerProcess):
[docs]
class HostMonitor(WorkerController):
"""
Gather resource usage statistics and feed them back to the client
"""
[docs]
WORKER_TYPE = "hostmonitor"
[docs]
WORKER_TITLE = "HostMonitor"
def __init__(self, config=None, reactor=None, personality=None):
super(HostMonitor, self).__init__(config=config, reactor=reactor, personality=personality)
[docs]
self._process = psutil.Process(os.getpid())
[docs]
self._is_running = False
@inlineCallbacks
[docs]
def onJoin(self, details):
self.log.info("HostMonitor connected (monitors available: {monitors})", monitors=sorted(MONITORS.keys()))
yield WorkerController.onJoin(self, details, publish_ready=False)
# register monitor procedures
dl = []
for monitor in self._monitors.values():
d = self.register(monitor.get, "{}.get_{}".format(self._prefix, monitor.ID))
dl.append(d)
res = yield DeferredList(dl, fireOnOneErrback=True)
print(res)
self.log.info("HostMonitor {pcnt} procedures registered", pcnt=len(res))
# signal this worker is done with setup and ready
yield self.publish_ready()
[docs]
def onLeave(self, details):
self.log.info("HostMonitor shutting down ..")
self.stop_monitoring()
self.disconnect()
@wamp.register(None)
[docs]
def get_monitoring(self, sensors=None, details=None):
if self._run:
if self._is_running:
status = "running"
else:
status = "starting"
pass
else:
status = "stopped"
if sensors is None:
sensors = list(self._monitors.keys())
else:
sensors = list(set(self._monitors.keys()).intersection(sensors))
monitoring = {
"status": status,
"config": self._config,
"current": {sensor: self._monitors[sensor].get() for sensor in sensors},
}
return monitoring
@wamp.register(None)
[docs]
def start_monitoring(self, config, details=None):
"""
Start monitoring.
:param config: Monitoring configuration.
:type config: dict
"""
self.log.info("HostMonitor start monitoring (config={config})", config=config)
# after the worker is up and running, host monitoring must be started
# (host monitoring can only be started or off, there is nothing to start instances of)
if self._run:
if self._is_running:
raise ApplicationError(
"crossbar.error.already_running", "cannot start host monitoring - monitoring already running"
)
else:
raise ApplicationError(
"crossbar.error.already_running", "cannot start host monitoring - monitoring currently starting"
)
# submonitors specified in the config
monitors = config.get("monitors", {})
if len(monitors) == 0:
raise ApplicationError(
"crossbar.error.invalid_configuration", "cannot start monitoring from empty monitors list"
)
# save config for later access
self._config = config
# sensor polling interval in ms
self._interval = config.get("interval", 500)
self._monitors = {}
for monitor_key, monitor_config in monitors.items():
if not isinstance(monitor_key, six.text_type):
raise ApplicationError(
"crossbar.error.invalid_configuration", 'invalid monitor key type "{}"'.format(type(monitor_key))
)
if monitor_key not in MONITORS.keys():
raise ApplicationError(
"crossbar.error.invalid_configuration",
'unknown monitor type "{}" (available monitors: {})'.format(monitor_key, sorted(MONITORS.keys())),
)
# get submonitor class
klass = MONITORS[monitor_key]
# instantiate submonitor
self._monitors[monitor_key] = klass(monitor_config)
def after_exit_success(_):
self._run = False
def after_exit_error(err):
self._run = False
self._run = True
d = deferToThread(self._loop)
d.addCallbacks(after_exit_success, after_exit_error)
self.log.info(
"HostMonitor loop started _from_ main thread (PID={pid}, thread={tid})",
pid=os.getpid(),
tid=threading.get_ident(),
)
started = {"monitors": sorted(list(self._monitors.keys()))}
topic = "{}.on_mon_started".format(self._uri_prefix)
self.publish(topic, started)
self.log.info("HostMonitor started monitoring (started={started})", started=started)
return started
@wamp.register(None)
[docs]
def stop_monitoring(self, details=None):
self.log.info("HostMonitor stop monitoring ..")
if not self._run:
if self._is_running:
self.log.warn("cannot stop host monitoring - monitoring already told to stop (but still running)")
else:
self.log.info("cannot stop host monitoring - monitoring already stopped")
return None
stopped = {"monitors": sorted(list(self._monitors.keys()))}
topic = "{}.on_monitoring_stopped".format(self._uri_prefix)
self._run = False
self._monitors = {}
self._config = None
self.publish(topic, stopped)
self.log.info("HostMonitor stopped monitoring (stopped={stopped})", stopped=stopped)
return stopped
[docs]
def _loop(self):
# this is running on a background thread! eg you cannot just call self.publish() or self.log()!
self._is_running = True
try:
print(
"HostMonitor entering loop on background thread (PID={pid}, thread={tid})".format(
pid=os.getpid(), tid=threading.get_ident()
)
)
while self._run:
started = time_ns()
self.log.debug(
"HostMonitor gather sensor data on background thread (started={started}, PID={pid}=thread {tid})",
started=started,
pid=os.getpid(),
tid=threading.get_ident(),
)
# poll all configured submonitors and collect the data
hdata = {}
for monitor in self._monitors.values():
hdata[monitor.ID] = monitor.poll()
self._reactor.callFromThread(self._publish, hdata)
# next time we want to loop (takes into account time for monitoring)
next_time = started + self._interval * 10**6
# wake up every 100ms until we want to loop
while self._run and time_ns() < next_time:
time.sleep(0.1)
except Exception as e:
print("HostMonitor ending loop because of exception: {}".format(e))
# mark the thread as "done" when still on the background thread
self._is_running = False
# the deferred return on the main thread from deferToThread will fire its errback
raise
else:
print("HostMonitor ending loop gracefully")
# the deferred return on the main thread from deferToThread will fire its callback
[docs]
def _publish(self, hdata):
self.log.debug(
"HostMonitor publish sensor data on main thread (PID {pid} thread {tid})",
pid=os.getpid(),
tid=threading.get_ident(),
)
# this is running on the main thread: doing the publishes here avoids a couple
# of context switches, and this way the background thread doesn't touch any
# WAMP stuff (which is good in general .. decoupling)
options = PublishOptions(acknowledge=True)
dl = []
for monitor_id, monitor_data in hdata.items():
d = self.publish("{}.on_{}_sample".format(self._uri_prefix, monitor_id), monitor_data, options=options)
dl.append(d)
d = DeferredList(dl)
def done(results):
ok = 0
err = 0
for success, result in results:
if success:
ok += 1
else:
err += 1
self.log.warn("HostMonitor publication failed with - {error}", error=result)
self.log.debug("HostMonitor publish: ok={ok}, err={err}", ok=ok, err=err)
d.addCallback(done)