#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import gc
import os
import sys
from datetime import datetime
from pprint import pformat
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure
try:
# Manhole support needs a couple of packages optional for Crossbar.
# So we catch import errors and note those.
#
# twisted.conch.manhole_ssh will import even without, but we _need_ SSH
import pyasn1 # noqa
import cryptography # noqa
except ImportError as e:
_MANHOLE_MISSING_REASON = str(e)
else:
_HAS_MANHOLE = True
_MANHOLE_MISSING_REASON = ""
import txaio
from autobahn import wamp
from autobahn.twisted.wamp import ApplicationSession
from autobahn.util import rtime, utcnow, utcstr
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.types import ComponentConfig, PublishOptions, RegisterOptions
from twisted.cred import portal
from txaio import make_logger
from crossbar._util import hl, hlid, hltype, hlval
from crossbar.common.processinfo import _HAS_PSUTIL
from crossbar.common.twisted.endpoint import create_listening_port_from_config
if _HAS_PSUTIL:
import psutil
from crossbar.common.monitor import ProcessMonitor
from crossbar.common.processinfo import ProcessInfo
if _HAS_MANHOLE:
[docs]
class ManholeService(object):
"""
Manhole service running inside a native processes (controller, router, container).
This class is for _internal_ use within NativeProcess.
"""
def __init__(self, config, who):
"""
Ctor.
:param config: The configuration the manhole service was started with.
:type config: dict
:param who: Who triggered creation of this service.
:type who: str
"""
[docs]
self.status = "starting"
[docs]
self.created = datetime.utcnow()
[docs]
def marshal(self):
"""
Marshal object information for use with WAMP calls/events.
:returns: dict -- The marshalled information.
"""
now = datetime.utcnow()
return {
"created": utcstr(self.created),
"status": self.status,
"started": utcstr(self.started) if self.started else None,
"uptime": (now - self.started).total_seconds() if self.started else None,
"config": self.config,
}
[docs]
class NativeProcess(ApplicationSession):
"""
A native Crossbar.io process (currently: controller, router or container).
"""
[docs]
def onUserError(self, fail, msg):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onUserError`
"""
if isinstance(fail.value, ApplicationError):
self.log.debug(
'{klass}.onUserError(): "{msg}"', klass=self.__class__.__name__, msg=fail.value.error_message()
)
else:
self.log.error(
'{klass}.onUserError(): "{msg}"\n{traceback}',
klass=self.__class__.__name__,
msg=msg,
traceback=txaio.failure_format_traceback(fail),
)
def __init__(self, config=None, reactor=None, personality=None):
assert config is None or isinstance(config, ComponentConfig)
# Twisted reactor
if not reactor:
from twisted.internet import reactor
self._reactor = reactor
[docs]
self._reactor = reactor
# node/software personality
if personality:
self.personality = personality
else:
from crossbar.personality import Personality
self.personality = Personality
# base ctor
super(ApplicationSession, self).__init__(config=config)
[docs]
self._realm = config.realm if config else None
[docs]
self._node_id = config.extra.node if config and config.extra else None
[docs]
self._worker_id = config.extra.worker if config and config.extra else None
[docs]
self._uri_prefix = "crossbar.worker.{}".format(self._worker_id)
@property
[docs]
def node_id(self):
return self._node_id
@property
[docs]
def worker_id(self):
return self._worker_id
[docs]
def onConnect(self, do_join=True):
if not hasattr(self, "cbdir"):
self.cbdir = self.config.extra.cbdir
if not hasattr(self, "_uri_prefix"):
self._uri_prefix = "crossbar.node.{}".format(self.config.extra.node)
self._started = datetime.utcnow()
# see: BaseSession
self.include_traceback = False
self._manhole_service = None
if _HAS_PSUTIL:
self._pinfo = ProcessInfo()
self._pmonitor = ProcessMonitor(self.WORKER_TYPE, {})
self._pinfo_monitor = None
self._pinfo_monitor_seq = 0
else:
self._pinfo = None
self._pmonitor = None
self._pinfo_monitor = None
self._pinfo_monitor_seq = None
self.log.info("Process utilities not available")
if do_join:
self.join(self.config.realm)
@inlineCallbacks
[docs]
def onJoin(self, details):
"""
Called when process has joined the node's management realm.
"""
regs = yield self.register(
self,
prefix="{}.".format(self._uri_prefix),
options=RegisterOptions(details_arg="details"),
)
procs = []
errors = []
for reg in regs:
if isinstance(reg, Failure):
self.log.error("Failed to register management procedure: {f}", f=reg, log_failure=reg)
errors.append(str(reg))
else:
procs.append(reg.procedure)
if errors:
raise ApplicationError(
"crossbar.error.cannot_start", "management API could not be initialized", errors=errors
)
else:
self.log.debug(
'Ok, registered {len_reg} management procedures on realm "{realm}" [{func}]:\n\n{procs}\n',
len_reg=hlval(len(regs)),
realm=hl(self.realm),
func=hltype(self.onJoin),
procs=hl(pformat(procs), color="white", bold=True),
)
self.log.info(
'Native worker ready! (worker={worker}, node_id="{node_id}", worker_id="{worker_id}") [{func}]',
node_id=hlid(self._node_id),
worker_id=hlid(self._worker_id),
cbdir=hlval(self.cbdir),
worker=hlid(self.__class__.__name__),
func=hltype(self.onJoin),
)
returnValue(regs)
@wamp.register(None)
[docs]
def get_cpu_count(self, logical=True, details=None):
"""
Returns the CPU core count on the machine this process is running on.
:param logical: If enabled (default), include logical CPU cores ("Hyperthreading"),
else only count physical CPU cores.
:type logical: bool
:returns: The number of CPU cores.
:rtype: int
"""
if not _HAS_PSUTIL:
emsg = "unable to get CPU count: required package 'psutil' is not installed"
self.log.warn(emsg)
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
return psutil.cpu_count(logical=logical)
@wamp.register(None)
[docs]
def get_cpus(self, details=None):
"""
:returns: List of CPU IDs.
:rtype: list[int]
"""
if not _HAS_PSUTIL:
emsg = "unable to get CPUs: required package 'psutil' is not installed"
self.log.warn(emsg)
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
return self._pinfo.cpus
@wamp.register(None)
[docs]
def get_cpu_affinity(self, details=None):
"""
Get CPU affinity of this process.
:returns: List of CPU IDs the process affinity is set to.
:rtype: list[int]
"""
if not _HAS_PSUTIL:
emsg = "unable to get CPU affinity: required package 'psutil' is not installed"
self.log.warn(emsg)
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
try:
p = psutil.Process(os.getpid())
current_affinity = p.cpu_affinity()
except Exception as e:
emsg = "Could not get CPU affinity: {}".format(e)
self.log.failure(emsg)
raise ApplicationError("crossbar.error.runtime_error", emsg)
else:
return current_affinity
@wamp.register(None)
[docs]
def set_cpu_affinity(self, cpus, relative=True, details=None):
"""
Set CPU affinity of this process.
:param cpus: List of CPU IDs to set process affinity to. Each CPU ID must be
from the list `[0 .. N_CPUs]`, where N_CPUs can be retrieved via
``crossbar.worker.<worker_id>.get_cpu_count``.
:type cpus: list[int]
:returns: List of CPU IDs the process affinity is set to.
:rtype: list[int]
"""
if not _HAS_PSUTIL:
emsg = "Unable to set CPU affinity: required package 'psutil' is not installed"
self.log.warn(emsg)
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
if sys.platform.startswith("darwin"):
# https://superuser.com/questions/149312/how-to-set-processor-affinity-on-os-x
emsg = "Unable to set CPU affinity: OSX lacks process CPU affinity"
self.log.warn(emsg)
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
if relative:
_cpu_ids = self._pinfo.cpus
_cpus = [_cpu_ids[i] for i in cpus]
else:
_cpus = cpus
try:
p = psutil.Process(os.getpid())
p.cpu_affinity(_cpus)
new_affinity = p.cpu_affinity()
if set(_cpus) != set(new_affinity):
raise Exception(
"CPUs mismatch after affinity setting ({} != {})".format(set(_cpus), set(new_affinity))
)
except Exception as e:
emsg = "Could not set CPU affinity: {}".format(e)
self.log.failure(emsg)
raise ApplicationError("crossbar.error.runtime_error", emsg)
else:
# publish info to all but the caller ..
#
cpu_affinity_set_topic = "{}.on_cpu_affinity_set".format(self._uri_prefix)
cpu_affinity_set_info = {
"cpus": cpus,
"relative": relative,
"affinity": new_affinity,
"who": details.caller,
}
self.publish(cpu_affinity_set_topic, cpu_affinity_set_info, options=PublishOptions(exclude=details.caller))
# .. and return info directly to caller
#
return new_affinity
@wamp.register(None)
[docs]
def get_process_info(self, details=None):
"""
Get process information (open files, sockets, ...).
:returns: Dictionary with process information.
"""
self.log.debug("{cls}.get_process_info", cls=self.__class__.__name__)
if self._pinfo:
# psutil.AccessDenied
# PermissionError: [Errno 13] Permission denied: '/proc/14787/io'
return self._pinfo.get_info()
else:
emsg = "Could not retrieve process statistics: required packages not installed"
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
@wamp.register(None)
[docs]
def get_process_stats(self, details=None):
"""
Get process statistics (CPU, memory, I/O).
:returns: Dictionary with process statistics.
"""
self.log.debug("{cls}.get_process_stats", cls=self.__class__.__name__)
if self._pinfo:
return self._pinfo.get_stats()
else:
emsg = "Could not retrieve process statistics: required packages not installed"
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
@wamp.register(None)
[docs]
def get_process_monitor(self, details=None):
self.log.debug("{cls}.get_process_monitor", cls=self.__class__.__name__)
if self._pmonitor:
return self._pmonitor.poll()
else:
emsg = "Could not retrieve process statistics: required packages not installed"
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
@wamp.register(None)
[docs]
def set_process_stats_monitoring(self, interval, details=None):
"""
Enable/disable periodic publication of process statistics.
:param interval: The monitoring interval in seconds. Set to 0 to disable monitoring.
:type interval: float
"""
self.log.debug(
"{cls}.set_process_stats_monitoring(interval = {interval})", cls=self.__class__.__name__, interval=interval
)
if self._pinfo:
stats_monitor_set_topic = "{}.on_process_stats_monitoring_set".format(self._uri_prefix)
# stop and remove any existing monitor
if self._pinfo_monitor:
self._pinfo_monitor.stop()
self._pinfo_monitor = None
self.publish(stats_monitor_set_topic, 0, options=PublishOptions(exclude=details.caller))
# possibly start a new monitor
if interval > 0:
stats_topic = "{}.on_process_stats".format(self._uri_prefix)
def publish_stats():
stats = self._pinfo.get_stats()
self._pinfo_monitor_seq += 1
stats["seq"] = self._pinfo_monitor_seq
self.publish(stats_topic, stats)
self._pinfo_monitor = LoopingCall(publish_stats)
self._pinfo_monitor.start(interval)
self.publish(stats_monitor_set_topic, interval, options=PublishOptions(exclude=details.caller))
else:
emsg = "Cannot setup process statistics monitor: required packages not installed"
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
@wamp.register(None)
[docs]
def trigger_gc(self, details=None):
"""
Manually trigger a garbage collection in this native process.
This procedure is registered under
``crossbar.node.<node_id>.worker.<worker_id>.trigger_gc``
for native workers and under
``crossbar.node.<node_id>.controller.trigger_gc``
for node controllers.
The procedure will publish an event when the garabage collection has finished to
``crossbar.node.<node_id>.worker.<worker_id>.on_gc_finished``
for native workers and
``crossbar.node.<node_id>.controller.on_gc_finished``
for node controllers:
.. code-block:: javascript
{
"requester": {
"session_id": 982734923,
"auth_id": "bob",
"auth_role": "admin"
},
"duration": 190
}
.. note:: The caller of this procedure will NOT receive the event.
:returns: Time (wall clock) consumed for garbage collection in ms.
:rtype: int
"""
self.log.debug("{cls}.trigger_gc", cls=self.__class__.__name__)
started = rtime()
# now trigger GC .. this is blocking!
gc.collect()
duration = int(round(1000.0 * (rtime() - started)))
on_gc_finished = "{}.on_gc_finished".format(self._uri_prefix)
self.publish(
on_gc_finished,
{
"requester": {
"session_id": details.caller,
# FIXME:
"auth_id": None,
"auth_role": None,
},
"duration": duration,
},
options=PublishOptions(exclude=details.caller),
)
return duration
@wamp.register(None)
@inlineCallbacks
[docs]
def start_manhole(self, config, details=None):
"""
Start a Manhole service within this process.
**Usage:**
This procedure is registered under
* ``crossbar.node.<node_id>.worker.<worker_id>.start_manhole`` - for native workers
* ``crossbar.node.<node_id>.controller.start_manhole`` - for node controllers
The procedure takes a Manhole service configuration which defines
a listening endpoint for the service and a list of users including
passwords, e.g.
.. code-block:: javascript
{
"endpoint": {
"type": "tcp",
"port": 6022
},
"users": [
{
"user": "oberstet",
"password": "secret"
}
]
}
**Errors:**
The procedure may raise the following errors:
* ``crossbar.error.invalid_configuration`` - the provided configuration is invalid
* ``crossbar.error.already_started`` - the Manhole service is already running (or starting)
* ``crossbar.error.feature_unavailable`` - the required support packages are not installed
**Events:**
The procedure will publish an event when the service **is starting** to
* ``crossbar.node.<node_id>.worker.<worker_id>.on_manhole_starting`` - for native workers
* ``crossbar.node.<node_id>.controller.on_manhole_starting`` - for node controllers
and publish an event when the service **has started** to
* ``crossbar.node.<node_id>.worker.<worker_id>.on_manhole_started`` - for native workers
* ``crossbar.node.<node_id>.controller.on_manhole_started`` - for node controllers
:param config: Manhole service configuration.
:type config: dict
"""
self.log.debug("{cls}.start_manhole(config = {config})", cls=self.__class__.__name__, config=config)
if not _HAS_MANHOLE:
emsg = "Could not start manhole: required packages are missing ({})".format(_MANHOLE_MISSING_REASON)
self.log.error(emsg)
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
if self._manhole_service:
emsg = "Could not start manhole - already running (or starting)"
self.log.warn(emsg)
raise ApplicationError("crossbar.error.already_started", emsg)
try:
self.personality.check_manhole(self.personality, config)
except Exception as e:
emsg = "Could not start manhole: invalid configuration ({})".format(e)
self.log.error(emsg)
raise ApplicationError("crossbar.error.invalid_configuration", emsg)
from twisted.conch.checkers import SSHPublicKeyDatabase
from twisted.conch.manhole import ColoredManhole
from twisted.conch.manhole_ssh import ConchFactory, TerminalRealm, TerminalSession
from twisted.conch.ssh import keys
class PublicKeyChecker(SSHPublicKeyDatabase):
def __init__(self, userKeys):
self.userKeys = {}
for username, keyData in userKeys.items():
self.userKeys[username] = keys.Key.fromString(data=keyData).blob()
def checkKey(self, credentials):
username = credentials.username.decode("utf8")
if username in self.userKeys:
keyBlob = self.userKeys[username]
return keyBlob == credentials.blob
# setup user authentication
#
authorized_keys = {
"oberstet": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCz7K1QwDhaq/Bi8o0uqiJQuVFCDQL5rbRvMClLHRx9KE3xP2Fh2eapzXuYGSgtG9Fyz1UQd+1oNM3wuNnT/DsBUBQrECP4bpFIHcJkMaFTARlCagkXosWsadzNnkW0osUCuHYMrzBJuXWF2GH+0OFCtVu+8E+4Mhvchu9xsHG8PM92SpI6aP0TtmT9D/0Bsm9JniRj8kndeS+iWG4s/pEGj7Rg7eGnbyQJt/9Jc1nWl6PngGbwp63dMVmh+8LP49PtfnxY8m9fdwpL4oW9U8beYqm8hyfBPN2yDXaehg6RILjIa7LU2/6bu96ZgnIz26zi/X9XlnJQt2aahWJs1+GR oberstet@thinkpad-t430s"
}
checker = PublicKeyChecker(authorized_keys)
# setup manhole namespace
#
namespace = {"session": self}
class PatchedTerminalSession(TerminalSession):
# get rid of
# exceptions.AttributeError: TerminalSession instance has no attribute 'windowChanged'
def windowChanged(self, winSize):
pass
rlm = TerminalRealm()
rlm.sessionFactory = PatchedTerminalSession # monkey patch
rlm.chainedProtocolFactory.protocolFactory = lambda _: ColoredManhole(namespace)
ptl = portal.Portal(rlm)
ptl.registerChecker(checker)
factory = ConchFactory(ptl)
factory.noisy = False
private_key = keys.Key.fromFile(os.path.join(self.cbdir, "ssh_host_rsa_key"))
public_key = private_key.public()
publicKeys = {b"ssh-rsa": public_key}
privateKeys = {b"ssh-rsa": private_key}
factory.publicKeys = publicKeys
factory.privateKeys = privateKeys
self._manhole_service = ManholeService(config, details.caller)
starting_topic = "{}.on_manhole_starting".format(self._uri_prefix)
starting_info = self._manhole_service.marshal()
# the caller gets a progressive result ..
if details.progress:
details.progress(starting_info)
# .. while all others get an event
self.publish(starting_topic, starting_info, options=PublishOptions(exclude=details.caller))
try:
self._manhole_service.port = yield create_listening_port_from_config(
config["endpoint"], self.cbdir, factory, self._reactor, self.log
)
except Exception as e:
self._manhole_service = None
emsg = "Manhole service endpoint cannot listen: {}".format(e)
self.log.error(emsg)
raise ApplicationError("crossbar.error.cannot_listen", emsg)
# alright, manhole has started
self._manhole_service.started = datetime.utcnow()
self._manhole_service.status = "started"
started_topic = "{}.on_manhole_started".format(self._uri_prefix)
started_info = self._manhole_service.marshal()
self.publish(started_topic, started_info, options=PublishOptions(exclude=details.caller))
returnValue(started_info)
@wamp.register(None)
@inlineCallbacks
[docs]
def stop_manhole(self, details=None):
"""
Stop the Manhole service running in this process.
This procedure is registered under
* ``crossbar.node.<node_id>.worker.<worker_id>.stop_manhole`` for native workers and under
* ``crossbar.node.<node_id>.controller.stop_manhole`` for node controllers
When no Manhole service is currently running within this process,
or the Manhole service is already shutting down, a
``crossbar.error.not_started`` WAMP error is raised.
The procedure will publish an event when the service **is stopping** to
* ``crossbar.node.<node_id>.worker.<worker_id>.on_manhole_stopping`` for native workers and
* ``crossbar.node.<node_id>.controller.on_manhole_stopping`` for node controllers
and will publish an event when the service **has stopped** to
* ``crossbar.node.<node_id>.worker.<worker_id>.on_manhole_stopped`` for native workers and
* ``crossbar.node.<node_id>.controller.on_manhole_stopped`` for node controllers
"""
self.log.debug("{cls}.stop_manhole", cls=self.__class__.__name__)
if not self._manhole_service or self._manhole_service.status != "started":
emsg = "Cannot stop manhole: not running (or already shutting down)"
raise ApplicationError("crossbar.error.not_started", emsg)
self._manhole_service.status = "stopping"
stopping_topic = "{}.on_manhole_stopping".format(self._uri_prefix)
stopping_info = None
# the caller gets a progressive result ..
if details.progress:
details.progress(stopping_info)
# .. while all others get an event
self.publish(stopping_topic, stopping_info, options=PublishOptions(exclude=details.caller))
try:
yield self._manhole_service.port.stopListening()
except Exception as e:
self.log.warn("error while stop listening on endpoint: {error}", error=e)
self._manhole_service = None
stopped_topic = "{}.on_manhole_stopped".format(self._uri_prefix)
stopped_info = None
self.publish(stopped_topic, stopped_info, options=PublishOptions(exclude=details.caller))
returnValue(stopped_info)
@wamp.register(None)
[docs]
def get_manhole(self, details=None):
"""
Get current manhole service information.
:returns: A dict with service information or `None` if the service is not running.
"""
self.log.debug("{cls}.get_manhole", cls=self.__class__.__name__)
if not _HAS_MANHOLE:
emsg = "Could not start manhole: required packages are missing ({})".format(_MANHOLE_MISSING_REASON)
self.log.error(emsg)
raise ApplicationError("crossbar.error.feature_unavailable", emsg)
if not self._manhole_service:
return None
else:
return self._manhole_service.marshal()
@wamp.register(None)
[docs]
def utcnow(self, details=None):
"""
Return current time as determined from within this process.
**Usage:**
This procedure is registered under
* ``crossbar.node.<node_id>.worker.<worker_id>.utcnow`` for native workers and under
* ``crossbar.node.<node_id>.controller.utcnow`` for node controllers
:returns: Current time (UTC) in UTC ISO 8601 format.
:rtype: str
"""
self.log.debug("{cls}.utcnow", cls=self.__class__.__name__)
return utcnow()
@wamp.register(None)
[docs]
def started(self, details=None):
"""
Return start time of this process.
**Usage:**
This procedure is registered under
* ``crossbar.node.<node_id>.worker.<worker_id>.started`` for native workers and under
* ``crossbar.node.<node_id>.controller.started`` for node controllers
:returns: Start time (UTC) in UTC ISO 8601 format.
:rtype: str
"""
self.log.debug("{cls}.started", cls=self.__class__.__name__)
return utcstr(self._started)
@wamp.register(None)
[docs]
def uptime(self, details=None):
"""
Return uptime of this process.
**Usage:**
This procedure is registered under
* ``crossbar.node.<node_id>.worker.<worker_id>.uptime`` for native workers and under
* ``crossbar.node.<node_id>.controller.uptime`` for node controllers
:returns: Uptime in seconds.
:rtype: float
"""
self.log.debug("{cls}.uptime", cls=self.__class__.__name__)
now = datetime.utcnow()
return (now - self._started).total_seconds()