Source code for crossbar.node.main

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import argparse
import importlib
import json
import os
import platform
import signal
import sys
from importlib.metadata import version
from importlib.resources import files

import click
import txaio

txaio.use_twisted()  # noqa

from autobahn.websocket.protocol import WebSocketProtocol
from autobahn.websocket.utf8validator import Utf8Validator
from autobahn.websocket.xormasker import XorMaskerNull
from twisted.internet.defer import inlineCallbacks
from twisted.logger import globalLogPublisher
from twisted.python.reflect import qual
from txaio import failure_format_traceback, make_logger, set_global_log_level, start_logging

from crossbar._logging import LogLevel, make_logfile_observer, make_stderr_observer, make_stdout_observer
from crossbar._util import _add_cbdir_config, _add_debug_options, _add_log_arguments, hl, hlid, hltype, term_print
from crossbar.common.checkconfig import InvalidConfigException, color_json
from crossbar.common.key import _maybe_generate_node_key, _read_node_key, _read_release_key
from crossbar.node.template import Templates
from crossbar.worker import main as worker_main

try:
    import vmprof

    _HAS_VMPROF = True
except ImportError:
    _HAS_VMPROF = False

try:
    import psutil

    _HAS_PSUTIL = True
except ImportError:
    _HAS_PSUTIL = False

_HAS_COLOR_TERM = False
try:
    import colorama

    # https://github.com/tartley/colorama/issues/48
    term = None
    if sys.platform == "win32" and "TERM" in os.environ:
        term = os.environ.pop("TERM")

    colorama.init()
    _HAS_COLOR_TERM = True

    if term:
        os.environ["TERM"] = term

except ImportError:
    pass

__all__ = ("main",)

_PID_FILENAME = "node.pid"


def _get_version(name_or_module):
    if isinstance(name_or_module, str):
        name_or_module = importlib.import_module(name_or_module)

    if hasattr(name_or_module, "__version__"):
        v = name_or_module.__version__
    elif hasattr(name_or_module, "version"):
        v = name_or_module.version
    else:
        try:
            v = version(name_or_module.__name__)
        except:
            # eg flatbuffers when run from single file EXE (pyinstaller): https://github.com/google/flatbuffers/issues/5299
            v = "?.?.?"

    if type(v) in (tuple, list):
        return ".".join(str(x) for x in v)
    elif isinstance(v, str):
        return v
    else:
        raise RuntimeError('unexpected type {} for version in module "{}"'.format(type(v), name_or_module))


def _check_pid_exists(pid):
    """
    Check if a process with given PID exists.

    :returns: ``True`` if a process exists.
    :rtype: bool
    """
    if sys.platform == "win32":
        if _HAS_PSUTIL:
            # http://pythonhosted.org/psutil/#psutil.pid_exists
            return psutil.pid_exists(pid)
        else:
            # On Windows, this can only be done with native code (like via win32com, ctypes or psutil).
            # We use psutil.
            raise Exception("cannot check if process with PID exists - package psutil not installed")
    else:
        # Unix-like OS
        # http://stackoverflow.com/a/568285/884770
        try:
            os.kill(pid, 0)
        except OSError:
            return False
        else:
            return True


def _is_crossbar_process(cmdline):
    """
    Returns True if the cmdline passed appears to really be a running
    crossbar instance.
    """
    if len(cmdline) > 1 and "crossbar" in cmdline[1]:
        return True
    if len(cmdline) > 0 and cmdline[0] == "crossbar-controller":
        return True
    return False


def _check_is_running(cbdir):
    """
    Check if a Crossbar.io node is already running on a Crossbar.io node directory.

    :param cbdir: The Crossbar.io node directory to check.
    :type cbdir: str

    :returns: The PID of the running Crossbar.io controller process or ``None``
    :rtype: int or None
    """
    log = make_logger()

    remove_PID_type = None
    remove_PID_reason = None

    fp = os.path.join(cbdir, _PID_FILENAME)

    if os.path.isfile(fp):
        with open(fp) as fd:
            pid_data_str = fd.read()
            try:
                pid_data = json.loads(pid_data_str)
                pid = int(pid_data["pid"])
            except ValueError:
                remove_PID_type = "corrupt"
                remove_PID_reason = "corrupt .pid file"
            else:
                if pid == os.getpid():
                    # the process ID is our own -- this happens often when the Docker container is
                    # shut down uncleanly
                    return None
                elif sys.platform == "win32" and not _HAS_PSUTIL:
                    # when on Windows, and we can't actually determine if the PID exists,
                    # just assume it exists
                    return pid_data
                else:
                    pid_exists = _check_pid_exists(pid)
                    if pid_exists:
                        if _HAS_PSUTIL:
                            # additionally check this is actually a crossbar process
                            p = psutil.Process(pid)
                            cmdline = p.cmdline()
                            if not _is_crossbar_process(cmdline):
                                nicecmdline = " ".join(cmdline)
                                if len(nicecmdline) > 76:
                                    nicecmdline = nicecmdline[:38] + " ... " + nicecmdline[-38:]
                                log.info('"{fp}" points to PID {pid} which is not a crossbar process:', fp=fp, pid=pid)
                                log.info("  {cmdline}", cmdline=nicecmdline)
                                log.info("Verify manually and either kill {pid} or delete {fp}", pid=pid, fp=fp)
                                return None
                        return pid_data
                    else:
                        remove_PID_type = "stale"
                        remove_PID_reason = "pointing to non-existing process with PID {}".format(pid)

    if remove_PID_type:
        # If we have to remove a PID, do it here.
        try:
            os.remove(fp)
        except:
            log.info(
                ("Could not remove {pidtype} Crossbar.io PID file ({reason}) {fp} - {log_failure}"),
                pidtype=remove_PID_type,
                reason=remove_PID_reason,
                fp=fp,
            )
        else:
            log.info(
                "{pidtype} Crossbar.io PID file ({reason}) {fp} removed",
                pidtype=remove_PID_type.title(),
                reason=remove_PID_reason,
                fp=fp,
            )

    return None


def _run_command_legal(options, reactor, personality, verbose=True):
    """
    Subcommand "crossbar legal".
    """
    docs = [personality.LICENSE, personality.LICENSES_OSS]

    print(hl("*" * 120, bold=True, color="yellow"))
    for package, resource_name in docs:
        filename = str(files(package) / resource_name)
        filepath = os.path.abspath(filename)
        print(hl("   " + filepath + " :\n", bold=False, color="yellow"))
        with open(filepath) as f:
            legal = f.read()
            print(hl(legal, bold=True, color="white"))
        print(hl("*" * 120, bold=True, color="yellow"))


class Versions(object):
    def __init__(self):
        self.executable = ""
        self.platform = ""
        self.machine = ""
        self.py_ver = ""
        self.py_ver_string = ""
        self.py_ver_detail = ""
        self.py_is_frozen = ""
        self.pip_ver = ""
        self.tx_ver = ""
        self.tx_loc = ""
        self.txaio_ver = ""
        self.ab_ver = ""
        self.ab_loc = ""
        self.utf8_ver = ""
        self.utf8_loc = ""
        self.xor_ver = ""
        self.xor_loc = ""
        self.json_ver = ""
        self.msgpack_ver = ""
        self.cbor_ver = ""
        self.ubjson_ver = ""
        self.flatbuffers_ver = ""
        self.lmdb_ver = ""
        self.crossbar_ver = ""
        self.numpy_ver = ""
        self.zlmdb_ver = ""
        self.cfxdb_ver = ""
        self.xbr_ver = ""
        self.release_pubkey = ""
        self.supported_serializers = ""

    def marshal(self):
        obj = {}
        obj["executable"] = self.executable
        obj["platform"] = self.platform
        obj["machine"] = self.machine
        obj["py_ver"] = self.py_ver
        obj["py_ver_string"] = self.py_ver_string
        obj["py_ver_detail"] = self.py_ver_detail
        obj["py_is_frozen"] = self.py_is_frozen
        obj["pip_ver"] = self.pip_ver
        obj["tx_ver"] = self.tx_ver
        obj["tx_loc"] = self.tx_loc
        obj["txaio_ver"] = self.txaio_ver
        obj["ab_ver"] = self.ab_ver
        obj["ab_loc"] = self.ab_loc
        obj["utf8_ver"] = self.utf8_ver
        obj["utf8_loc"] = self.utf8_loc
        obj["xor_ver"] = self.xor_ver
        obj["xor_loc"] = self.xor_loc
        obj["json_ver"] = self.json_ver
        obj["msgpack_ver"] = self.msgpack_ver
        obj["cbor_ver"] = self.cbor_ver
        obj["ubjson_ver"] = self.ubjson_ver
        obj["flatbuffers_ver"] = self.flatbuffers_ver
        obj["lmdb_ver"] = self.lmdb_ver
        obj["crossbar_ver"] = self.crossbar_ver
        obj["numpy_ver"] = self.numpy_ver
        obj["zlmdb_ver"] = self.zlmdb_ver
        obj["cfxdb_ver"] = self.cfxdb_ver
        obj["xbr_ver"] = self.xbr_ver
        obj["release_pubkey"] = self.release_pubkey
        obj["supported_serializers"] = self.supported_serializers
        return obj


def _get_versions(reactor):
    v = Versions()

    v.executable = os.path.realpath(sys.executable)

    v.platform = platform.platform()
    v.machine = platform.machine()

    # Python
    v.py_ver = ".".join([str(x) for x in list(sys.version_info[:3])])
    v.py_ver_string = "%s" % sys.version.replace("\n", " ")

    if "pypy_version_info" in sys.__dict__:
        v.py_ver_detail = "{}-{}".format(
            platform.python_implementation(), ".".join(str(x) for x in sys.pypy_version_info[:3])
        )
    else:
        v.py_ver_detail = platform.python_implementation()

    # Pyinstaller (frozen EXE)
    v.py_is_frozen = getattr(sys, "frozen", False)

    # Twisted / Reactor
    v.tx_ver = "%s-%s" % (_get_version("twisted"), reactor.__class__.__name__)
    v.tx_loc = "%s" % qual(reactor.__class__)

    # txaio
    v.txaio_ver = _get_version("txaio")

    # Autobahn
    v.ab_ver = _get_version("autobahn")
    v.ab_loc = "%s" % qual(WebSocketProtocol)

    # UTF8 Validator
    s = qual(Utf8Validator)
    if "wsaccel" in s:
        v.utf8_ver = "wsaccel-%s" % _get_version("wsaccel")
    elif s.startswith("autobahn"):
        v.utf8_ver = "autobahn"
    else:
        # could not detect UTF8 validator type/version
        v.utf8_ver = "?"
    v.utf8_loc = "%s" % qual(Utf8Validator)

    # XOR Masker
    s = qual(XorMaskerNull)
    if "wsaccel" in s:
        v.xor_ver = "wsaccel-%s" % _get_version("wsaccel")
    elif s.startswith("autobahn"):
        v.xor_ver = "autobahn"
    else:
        # could not detect XOR masker type/version
        v.xor_ver = "?"
    v.xor_loc = "%s" % qual(XorMaskerNull)

    # JSON Serializer
    supported_serializers = ["JSON"]
    from autobahn.wamp.serializer import JsonObjectSerializer

    json_ver = JsonObjectSerializer.JSON_MODULE.__name__

    # If it's just 'json' then it's the stdlib one...
    if json_ver == "json":
        v.json_ver = "stdlib"
    else:
        v.json_ver = (json_ver + "-%s") % _get_version(json_ver)

    # MsgPack Serializer
    try:
        from autobahn.wamp.serializer import MsgPackObjectSerializer

        msgpack = MsgPackObjectSerializer.MSGPACK_MODULE
        v.msgpack_ver = "{}-{}".format(msgpack.__name__, _get_version(msgpack))
        supported_serializers.append("MessagePack")
    except ImportError:
        pass

    # CBOR Serializer
    try:
        from autobahn.wamp.serializer import CBORObjectSerializer

        cbor = CBORObjectSerializer.CBOR_MODULE
        v.cbor_ver = "{}-{}".format(cbor.__name__, _get_version(cbor))
        supported_serializers.append("CBOR")
    except ImportError:
        pass

    # UBJSON Serializer
    try:
        from autobahn.wamp.serializer import UBJSONObjectSerializer

        ubjson = UBJSONObjectSerializer.UBJSON_MODULE
        v.ubjson_ver = "{}-{}".format(ubjson.__name__, _get_version(ubjson))
        supported_serializers.append("UBJSON")
    except ImportError:
        pass

    # Flatbuffers Serializer
    try:
        from autobahn.wamp.serializer import FlatBuffersObjectSerializer

        flatbuffers = FlatBuffersObjectSerializer.FLATBUFFERS_MODULE
        v.flatbuffers_ver = "{}-{}".format(flatbuffers.__name__, _get_version(flatbuffers))
        supported_serializers.append("Flatbuffers")
    except ImportError:
        pass

    v.supported_serializers = supported_serializers

    # LMDB
    try:
        import lmdb  # noqa

        lmdb_lib_ver = ".".join([str(x) for x in lmdb.version()])
        v.lmdb_ver = "{}/lmdb-{}".format(_get_version(lmdb), lmdb_lib_ver)
    except ImportError:
        pass

    # crossbar
    try:
        import crossbar  # noqa

        v.crossbar_ver = _get_version(crossbar)
    except ImportError:
        pass

    # zlmdb
    try:
        import zlmdb  # noqa

        v.zlmdb_ver = _get_version(zlmdb)
    except ImportError:
        pass

    # cfxdb
    try:
        import cfxdb  # noqa

        v.cfxdb_ver = _get_version(cfxdb)
    except ImportError:
        pass

    # xbr
    try:
        import xbr  # noqa

        v.xbr_ver = _get_version(xbr)
    except ImportError:
        pass

    # numpy
    try:
        import numpy  # noqa

        v.numpy_ver = _get_version(numpy)
    except ImportError:
        pass

    # pip
    try:
        import pip  # noqa

        v.pip_ver = _get_version(pip)
    except ImportError:
        pass

    # Release Public Key
    from crossbar.common.key import _read_release_key

    release_pubkey = _read_release_key()
    v.release_pubkey = release_pubkey["base64"]

    return v


def _run_command_version(options, reactor, personality):
    """
    Subcommand "crossbar version".
    """
    log = make_logger()

    v = _get_versions(reactor)

    def decorate(text, fg="white", bg=None, bold=True):
        return click.style(text, fg=fg, bg=bg, bold=bold)

    for line in personality.BANNER.splitlines():
        log.info(hl(line, color="yellow", bold=True))
    log.info("")
    log.info(" Crossbar.io        : {ver}", ver=decorate(v.crossbar_ver))
    log.info("   txaio            : {ver}", ver=decorate(v.txaio_ver))
    log.info("   Autobahn         : {ver}", ver=decorate(v.ab_ver))
    log.info("     UTF8 Validator : {ver}", ver=decorate(v.utf8_ver))
    log.info("     XOR Masker     : {ver}", ver=decorate(v.xor_ver))
    log.info("     JSON Codec     : {ver}", ver=decorate(v.json_ver))
    log.info("     MsgPack Codec  : {ver}", ver=decorate(v.msgpack_ver))
    log.info("     CBOR Codec     : {ver}", ver=decorate(v.cbor_ver))
    log.info("     UBJSON Codec   : {ver}", ver=decorate(v.ubjson_ver))
    log.info("     FlatBuffers    : {ver}", ver=decorate(v.flatbuffers_ver))
    log.info("   Twisted          : {ver}", ver=decorate(v.tx_ver))
    log.info("   LMDB             : {ver}", ver=decorate(v.lmdb_ver))
    log.info("   Python           : {ver}/{impl}", ver=decorate(v.py_ver), impl=decorate(v.py_ver_detail))
    log.info("   PIP              : {ver}", ver=decorate(v.pip_ver))
    log.info("   NumPy            : {ver}", ver=decorate(v.numpy_ver))
    log.info("   zLMDB            : {ver}", ver=decorate(v.zlmdb_ver))
    log.info("   CFXDB            : {ver}", ver=decorate(v.cfxdb_ver))
    log.info("   XBR              : {ver}", ver=decorate(v.xbr_ver))
    log.info(" Frozen executable  : {py_is_frozen}", py_is_frozen=decorate("yes" if v.py_is_frozen else "no"))
    log.info(" Operating system   : {ver}", ver=decorate(v.platform))
    log.info(" Host machine       : {ver}", ver=decorate(v.machine))
    log.info(" Release key        : {release_pubkey}", release_pubkey=decorate(v.release_pubkey))
    log.info("")


def _run_command_keys(options, reactor, personality):
    """
    Subcommand "crossbar keys".
    """
    log = make_logger()

    # Generate a new node key pair (2 files), load and check
    _maybe_generate_node_key(options.cbdir)

    # Print keys

    # Release (public) key
    release_pubkey = _read_release_key()

    # Node key
    node_key = _read_node_key(options.cbdir, private=options.private)

    if options.private:
        key_title = "Crossbar.io Node PRIVATE Key"
    else:
        key_title = "Crossbar.io Node PUBLIC Key"

    log.info("")
    log.info("{key_title}", key_title=hl("Crossbar Software Release Key", color="yellow", bold=True))
    log.info("base64: {release_pubkey}", release_pubkey=release_pubkey["base64"])
    log.info(release_pubkey["qrcode"].strip())
    log.info("")
    log.info("{key_title}", key_title=hl(key_title, color="yellow", bold=True))
    log.info("hex: {node_key}", node_key=node_key["hex"])
    log.info(node_key["qrcode"].strip())
    log.info("")


def _run_command_init(options, reactor, personality):
    """
    Subcommand "crossbar init".
    """
    log = make_logger()

    if options.appdir is None:
        options.appdir = "."

    options.appdir = os.path.abspath(options.appdir)
    cbdir = os.path.join(options.appdir, ".crossbar")

    if os.path.exists(options.appdir):
        log.warn("Application directory '{appdir}' already exists!", appdir=options.appdir)
    else:
        try:
            os.mkdir(options.appdir)
        except Exception as e:
            raise Exception("could not create application directory '{}' ({})".format(options.appdir, e))
        else:
            log.info("Crossbar.io application directory '{appdir}' created", appdir=options.appdir)

    log.info("Initializing application directory '{options.appdir}' ..", options=options)

    get_started_hint = Templates.init(options.appdir, template="default")

    _maybe_generate_node_key(cbdir)

    log.info("Application directory initialized")

    if get_started_hint:
        log.info("\n{hint}\n", hint=get_started_hint)
    else:
        log.info("\nTo start your node, run 'crossbar start --cbdir {cbdir}'\n", cbdir=os.path.abspath(cbdir))


def _run_command_status(options, reactor, personality):
    """
    Subcommand "crossbar status".
    """
    log = make_logger()

    # https://docs.python.org/2/library/os.html#os.EX_UNAVAILABLE
    # https://www.freebsd.org/cgi/man.cgi?query=sysexits&sektion=3
    _EXIT_ERROR = getattr(os, "EX_UNAVAILABLE", 1)

    # check if there is a Crossbar.io instance currently running from
    # the Crossbar.io node directory at all
    pid_data = _check_is_running(options.cbdir)

    # optional current state to assert
    _assert = options.__dict__["assert"]
    if pid_data is None:
        if _assert == "running":
            log.error("Assert status RUNNING failed: status is {}".format(hl("STOPPED", color="red", bold=True)))
            sys.exit(_EXIT_ERROR)
        elif _assert == "stopped":
            log.info("Assert status STOPPED succeeded: status is {}".format(hl("STOPPED", color="green", bold=True)))
            sys.exit(0)
        else:
            log.info("Status is {}".format(hl("STOPPED", color="white", bold=True)))
            sys.exit(0)
    else:
        if _assert == "running":
            log.info("Assert status RUNNING succeeded: status is {}".format(hl("RUNNING", color="green", bold=True)))
            sys.exit(0)
        elif _assert == "stopped":
            log.error("Assert status STOPPED failed: status is {}".format(hl("RUNNING", color="red", bold=True)))
            sys.exit(_EXIT_ERROR)
        else:
            log.info("Status is {}".format(hl("RUNNING", color="white", bold=True)))
            sys.exit(0)


def _run_command_stop(options, reactor, personality):
    """
    Subcommand "crossbar stop".
    """
    # check if there is a Crossbar.io instance currently running from
    # the Crossbar.io node directory at all
    #
    pid_data = _check_is_running(options.cbdir)
    if pid_data:
        pid = pid_data["pid"]
        print("Stopping Crossbar.io currently running from node directory {} (PID {}) ...".format(options.cbdir, pid))
        if not _HAS_PSUTIL:
            if sys.platform == "win32":
                # Windows doesn't accept SIGINT
                os.kill(pid, signal.SIGTERM)
                print("SIGTERM sent to process {}.".format(pid))
            else:
                os.kill(pid, signal.SIGINT)
                print("SIGINT sent to process {}.".format(pid))
        else:
            p = psutil.Process(pid)
            try:
                # first try to interrupt (orderly shutdown)
                _INTERRUPT_TIMEOUT = 5
                # On Windows, SIGINT raises ValueError which is caught below.
                p.send_signal(signal.SIGINT)
                print("SIGINT sent to process {} .. waiting for exit ({} seconds) ...".format(pid, _INTERRUPT_TIMEOUT))
                p.wait(timeout=_INTERRUPT_TIMEOUT)
            except psutil.TimeoutExpired:
                print("... process {} still alive - will _terminate_ now.".format(pid))
                try:
                    _TERMINATE_TIMEOUT = 5
                    p.terminate()
                    print(
                        "SIGTERM sent to process {} .. waiting for exit ({} seconds) ...".format(
                            pid, _TERMINATE_TIMEOUT
                        )
                    )
                    p.wait(timeout=_TERMINATE_TIMEOUT)
                except psutil.TimeoutExpired:
                    print("... process {} still alive - will KILL now.".format(pid))
                    p.kill()
                    print("SIGKILL sent to process {}.".format(pid))
                else:
                    print("Process {} terminated.".format(pid))
            else:
                print("Process {} has excited gracefully.".format(pid))
        sys.exit(0)
    else:
        print("No Crossbar.io is currently running from node directory {}.".format(options.cbdir))
        sys.exit(getattr(os, "EX_UNAVAILABLE", 1))


def _start_logging(options, reactor):
    """
    Start the logging in a way that all the subcommands can use it.
    """
    loglevel = getattr(options, "loglevel", "info")
    logformat = getattr(options, "logformat", "none")
    color = getattr(options, "color", "auto")

    set_global_log_level(loglevel)

    # The log observers (things that print to stderr, file, etc)
    observers = []

    if getattr(options, "logtofile", False):
        # We want to log to a file
        if not options.logdir:
            logdir = options.cbdir
        else:
            logdir = options.logdir

        logfile = os.path.join(logdir, "node.log")

        if loglevel in ["error", "warn", "info"]:
            show_source = False
        else:
            show_source = True

        observers.append(make_logfile_observer(logfile, show_source))
    else:
        # We want to log to stdout/stderr.

        if color == "auto":
            if sys.__stdout__.isatty():
                color = True
            else:
                color = False
        elif color == "true":
            color = True
        else:
            color = False

        if loglevel == "none":
            # Do no logging!
            pass
        elif loglevel in ["error", "warn", "info"]:
            # Print info to stdout, warn+ to stderr
            observers.append(make_stdout_observer(show_source=False, format=logformat, color=color))
            observers.append(make_stderr_observer(show_source=False, format=logformat, color=color))
        elif loglevel == "debug":
            # Print debug+info to stdout, warn+ to stderr, with the class
            # source
            observers.append(
                make_stdout_observer(
                    show_source=True, levels=(LogLevel.info, LogLevel.debug), format=logformat, color=color
                )
            )
            observers.append(make_stderr_observer(show_source=True, format=logformat, color=color))
        elif loglevel == "trace":
            # Print trace+, with the class source
            observers.append(
                make_stdout_observer(
                    show_source=True, levels=(LogLevel.info, LogLevel.debug), format=logformat, trace=True, color=color
                )
            )
            observers.append(make_stderr_observer(show_source=True, format=logformat, color=color))
        else:
            assert False, "Shouldn't ever get here."

    for observer in observers:
        globalLogPublisher.addObserver(observer)

        # Make sure that it goes away
        reactor.addSystemEventTrigger("after", "shutdown", globalLogPublisher.removeObserver, observer)

    # Actually start the logger.
    start_logging(None, loglevel)


def _run_command_start(options, reactor, personality):
    """
    Subcommand "crossbar start".
    """
    # do not allow to run more than one Crossbar.io instance
    # from the same Crossbar.io node directory
    #
    pid_data = _check_is_running(options.cbdir)
    if pid_data:
        print("Crossbar.io is already running from node directory {} (PID {}).".format(options.cbdir, pid_data["pid"]))
        sys.exit(1)
    else:
        fp = os.path.join(options.cbdir, _PID_FILENAME)
        with open(fp, "wb") as fd:
            argv = options.argv
            options_dump = vars(options)
            pid_data = {
                "pid": os.getpid(),
                "argv": argv,
                "options": {x: y for x, y in options_dump.items() if x not in ["func", "argv"]},
            }
            fd.write(
                "{}\n".format(
                    json.dumps(pid_data, sort_keys=False, indent=4, separators=(", ", ": "), ensure_ascii=False)
                ).encode("utf8")
            )

    # remove node PID file when reactor exits
    #
    def remove_pid_file():
        fp = os.path.join(options.cbdir, _PID_FILENAME)
        if os.path.isfile(fp):
            os.remove(fp)

    reactor.addSystemEventTrigger("after", "shutdown", remove_pid_file)

    log = make_logger()

    # represents the running Crossbar.io node
    #
    enable_vmprof = False
    if _HAS_VMPROF:
        enable_vmprof = options.vmprof

    node_options = personality.NodeOptions(
        debug_lifecycle=options.debug_lifecycle,
        debug_programflow=options.debug_programflow,
        enable_vmprof=enable_vmprof,
    )

    node = personality.Node(personality, options.cbdir, reactor=reactor, options=node_options)

    # print the banner, personality and node directory
    #
    for line in personality.BANNER.splitlines():
        log.info(hl(line, color="yellow", bold=True))
    print()

    log.info(
        "{note} {func}",
        note=hl("Booting {} node ..".format(personality.NAME), color="red", bold=True),
        func=hltype(_run_command_start),
    )

    log.debug('Running on realm="{realm}" from cbdir="{cbdir}"', realm=hlid(node.realm), cbdir=hlid(options.cbdir))

    # check and load the node configuration
    #
    try:
        config_source, config_path = node.load_config(options.config)
    except InvalidConfigException as e:
        log.failure()
        log.error("Invalid node configuration")
        log.error("{e!s}", e=e)
        sys.exit(1)
    except:
        raise
    else:
        config_source = node.CONFIG_SOURCE_TO_STR.get(config_source, None)
        log.info(
            "Node configuration loaded [config_source={config_source}, config_path={config_path}]",
            config_source=hl(config_source, bold=True, color="green"),
            config_path=hlid(config_path),
        )

    # possibly generate new node key
    #
    if node.secmod is None:
        node.load_keys(options.cbdir)

    # if vmprof global profiling is enabled via command line option, this will carry
    # the file where vmprof writes its profile data
    if _HAS_VMPROF:
        _vm_prof = {
            # need to put this into a dict, since FDs are ints, and python closures can't
            # write to this otherwise
            "outfd": None
        }

    # https://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IReactorCore.html
    # Each "system event" in Twisted, such as 'startup', 'shutdown', and 'persist', has 3 phases:
    # 'before', 'during', and 'after' (in that order, of course). These events will be fired
    # internally by the Reactor.

    def before_reactor_started():
        term_print("CROSSBAR:REACTOR_STARTING")

    def after_reactor_started():
        term_print("CROSSBAR:REACTOR_STARTED")

        if _HAS_VMPROF and options.vmprof:
            outfn = os.path.join(options.cbdir, ".vmprof-controller-{}.dat".format(os.getpid()))
            _vm_prof["outfd"] = os.open(outfn, os.O_RDWR | os.O_CREAT | os.O_TRUNC)
            vmprof.enable(_vm_prof["outfd"], period=0.01)
            term_print("CROSSBAR:VMPROF_ENABLED:{}".format(outfn))

    def before_reactor_stopped():
        term_print("CROSSBAR:REACTOR_STOPPING")

        if _HAS_VMPROF and options.vmprof and _vm_prof["outfd"]:
            vmprof.disable()
            term_print("CROSSBAR:VMPROF_DISABLED")

    def after_reactor_stopped():
        # FIXME: we are indeed reaching this line, however,
        # the log output does not work (it also doesnt work using
        # plain old print). Dunno why.

        # my theory about this issue is: by the time this line
        # is reached, Twisted has already closed the stdout/stderr
        # pipes. hence we do an evil trick: we directly write to
        # the process' controlling terminal
        # https://unix.stackexchange.com/a/91716/52500
        term_print("CROSSBAR:REACTOR_STOPPED")

    reactor.addSystemEventTrigger("before", "startup", before_reactor_started)
    reactor.addSystemEventTrigger("after", "startup", after_reactor_started)
    reactor.addSystemEventTrigger("before", "shutdown", before_reactor_stopped)
    reactor.addSystemEventTrigger("after", "shutdown", after_reactor_stopped)

    # now actually start the node ..
    #
    exit_info = {"was_clean": None}

    def start_crossbar():
        term_print("CROSSBAR:NODE_STARTING")

        #
        # ****** main entry point of node ******
        #
        d = node.start()

        # node started successfully, and later ..
        def on_startup_success(_shutdown_complete):
            term_print("CROSSBAR:NODE_STARTED")

            shutdown_complete = _shutdown_complete["shutdown_complete"]

            # .. exits, signaling exit status _inside_ the result returned
            def on_shutdown_success(shutdown_info):
                exit_info["was_clean"] = shutdown_info["was_clean"]
                log.info("on_shutdown_success: was_clean={was_clean}", shutdown_info["was_clean"])

            # should not arrive here:
            def on_shutdown_error(err):
                exit_info["was_clean"] = False
                log.error("on_shutdown_error: {tb}", tb=failure_format_traceback(err))

            shutdown_complete.addCallbacks(on_shutdown_success, on_shutdown_error)

        # node could not even start
        def on_startup_error(err):
            term_print("CROSSBAR:NODE_STARTUP_FAILED")
            exit_info["was_clean"] = False
            log.error("Could not start node: {tb}", tb=failure_format_traceback(err))
            if reactor.running:
                reactor.stop()

        d.addCallbacks(on_startup_success, on_startup_error)

    # Call a function when the reactor is running. If the reactor has not started, the callable
    # will be scheduled to run when it does start.
    reactor.callWhenRunning(start_crossbar)

    # Special feature to automatically shutdown the node after this many seconds
    if options.shutdownafter:

        @inlineCallbacks
        def _shutdown():
            term_print("CROSSBAR:SHUTDOWN_AFTER_FIRED")
            shutdown_info = yield node.stop()
            exit_info["was_clean"] = shutdown_info["was_clean"]
            term_print("CROSSBAR:SHUTDOWN_AFTER_COMPLETE")

        reactor.callLater(options.shutdownafter, _shutdown)

    # now enter event loop ..
    #
    log.info(hl("Entering event reactor ...", color="green", bold=True))
    term_print("CROSSBAR:REACTOR_ENTERED")
    reactor.run()

    # once the reactor has finally stopped, we get here, and at that point,
    # exit_info['was_clean'] MUST have been set before - either to True or to False
    # (otherwise we are missing a code path to handle in above)

    # exit the program with exit code depending on whether the node has been cleanly shut down
    if exit_info["was_clean"] is True:
        term_print("CROSSBAR:EXIT_WITH_SUCCESS")
        sys.exit(0)

    elif exit_info["was_clean"] is False:
        term_print("CROSSBAR:EXIT_WITH_ERROR")
        sys.exit(1)

    else:
        term_print("CROSSBAR:EXIT_WITH_INTERNAL_ERROR")
        sys.exit(1)


def _run_command_check(options, reactor, personality):
    """
    Subcommand "crossbar check".
    """
    configfile = os.path.join(options.cbdir, options.config)

    verbose = False

    try:
        print("Checking local node configuration file: {}".format(configfile))
        config = personality.check_config_file(personality, configfile)
    except Exception as e:
        print("Error: {}".format(e))
        sys.exit(1)
    else:
        print("Ok, node configuration looks good!")

        if verbose:
            config_content = json.dumps(
                config,
                skipkeys=False,
                sort_keys=False,
                ensure_ascii=False,
                separators=(",", ": "),
                indent=4,
            )
            print(color_json(config_content))

        sys.exit(0)


def _run_command_convert(options, reactor, personality):
    """
    Subcommand "crossbar convert".
    """
    configfile = os.path.join(options.cbdir, options.config)

    print("Converting local configuration file {}".format(configfile))

    try:
        personality.convert_config_file(personality, configfile)
    except Exception as e:
        print("\nError: {}\n".format(e))
        sys.exit(1)
    else:
        sys.exit(0)


def _run_command_upgrade(options, reactor, personality):
    """
    Subcommand "crossbar upgrade".
    """
    configfile = os.path.join(options.cbdir, options.config)

    print("Upgrading local configuration file {}".format(configfile))

    try:
        personality.upgrade_config_file(personality, configfile)
    except Exception as e:
        print("\nError: {}\n".format(e))
        sys.exit(1)
    else:
        sys.exit(0)


def _run_command_keygen(options, reactor, personality):
    """
    Subcommand "crossbar keygen".
    """

    try:
        from autobahn.wamp.cryptobox import KeyRing
    except ImportError:
        print("You should install 'autobahn[encryption]'")
        sys.exit(1)

    priv, pub = KeyRing().generate_key_hex()
    print("  private: {}".format(priv))
    print("   public: {}".format(pub))


def _print_usage(prog, personality):
    print(hl(personality.BANNER, color="yellow", bold=True))
    print(
        'Type "{} --help" to get help, or "{} <command> --help" to get help on a specific command.'.format(prog, prog)
    )
    print('Type "{} legal" to read legal notices, terms of use and license and privacy information.'.format(prog))
    print('Type "{} version" to print detailed version information.'.format(prog))


[docs] def main(prog, args, reactor, personality): """ Entry point of Crossbar.io CLI. """ from crossbar import _util _util.set_flags_from_args(args) term_print("CROSSBAR:MAIN_ENTRY") # print banner and usage notes when started with empty args # if args is not None and "--help" not in args: # if all args are options (start with "-"), then we don't have a command, # but we need one! hence, print a usage message if not [x for x in args if not x.startswith("-")]: _print_usage(prog, personality) return # create the top-level parser # parser = argparse.ArgumentParser(prog=prog, description=personality.DESC) _add_debug_options(parser) # create subcommand parser # subparsers = parser.add_subparsers(dest="command", title="commands", help="Command to run (required)") subparsers.required = True # ############################################################# # "init" command # parser_init = subparsers.add_parser("init", help="Initialize a new Crossbar.io node.") parser_init.add_argument( "--appdir", type=str, default=None, help="Application base directory where to create app and node from template.", ) parser_init.set_defaults(func=_run_command_init) # "start" command # parser_start = subparsers.add_parser("start", help="Start a Crossbar.io node.") _add_log_arguments(parser_start) _add_cbdir_config(parser_start) parser_start.add_argument( "--shutdownafter", type=int, default=None, help="Automatically shutdown node after this many seconds." ) if _HAS_VMPROF: parser_start.add_argument( "--vmprof", action="store_true", help="Profile node controller and native worker using vmprof." ) parser_start.set_defaults(func=_run_command_start) # "stop" command # parser_stop = subparsers.add_parser("stop", help="Stop a Crossbar.io node.") parser_stop.add_argument( "--cbdir", type=str, default=None, help="Crossbar.io node directory (overrides ${CROSSBAR_DIR} and the default ./.crossbar)", ) parser_stop.set_defaults(func=_run_command_stop) # "status" command # parser_status = subparsers.add_parser("status", help="Checks whether a Crossbar.io node is running.") parser_status.add_argument( "--cbdir", type=str, default=None, help="Crossbar.io node directory (overrides ${CROSSBAR_DIR} and the default ./.crossbar)", ) parser_status.add_argument( "--assert", type=str, default=None, choices=["running", "stopped"], help=("If given, assert the node is in this state, otherwise exit with error."), ) parser_status.set_defaults(func=_run_command_status) # "check" command # parser_check = subparsers.add_parser("check", help="Check a Crossbar.io node`s local configuration file.") _add_cbdir_config(parser_check) parser_check.set_defaults(func=_run_command_check) # "convert" command # parser_convert = subparsers.add_parser( "convert", help="Convert a Crossbar.io node`s local configuration file from JSON to YAML or vice versa." ) _add_cbdir_config(parser_convert) parser_convert.set_defaults(func=_run_command_convert) # "upgrade" command # parser_upgrade = subparsers.add_parser( "upgrade", help="Upgrade a Crossbar.io node`s local configuration file to current configuration file format." ) _add_cbdir_config(parser_upgrade) parser_upgrade.set_defaults(func=_run_command_upgrade) # "keygen" command # parser_keygen = subparsers.add_parser( "keygen", help="Generate public/private keypairs for use with autobahn.wamp.cryptobox.KeyRing" ) parser_keygen.set_defaults(func=_run_command_keygen) # "keys" command # parser_keys = subparsers.add_parser( "keys", help="Print Crossbar.io release and node key (public key part by default)." ) parser_keys.add_argument( "--cbdir", type=str, default=None, help="Crossbar.io node directory (overrides ${CROSSBAR_DIR} and the default ./.crossbar)", ) parser_keys.add_argument( "--private", action="store_true", help="Print the node private key instead of the public key." ) parser_keys.set_defaults(func=_run_command_keys) # "version" command # parser_version = subparsers.add_parser("version", help="Print software versions.") parser_version.set_defaults(func=_run_command_version) # "legal" command # parser_legal = subparsers.add_parser("legal", help="Print legal and licensing information.") parser_legal.set_defaults(func=_run_command_legal) # INTERNAL USE! start a worker (this is used by the controller to start worker processes # but cannot be used outside that context. # argparse.SUPPRESS does not work here =( so we obfuscate the name to discourage use. # parser_worker = subparsers.add_parser("_exec_worker", help="Program internal use.") parser_worker = worker_main.get_argument_parser(parser_worker) parser_worker.set_defaults(func=worker_main._run_command_exec_worker) # ############################################################# # parse cmd line args # options = parser.parse_args(args) options.argv = [prog] + args if hasattr(options, "shutdownafter") and options.shutdownafter: options.shutdownafter = float(options.shutdownafter) # colored logging does not work on Windows, so overwrite it! # FIXME: however, printing the banner in color works at least now: # So maybe we can get the actual log output also working in color. if sys.platform == "win32": options.color = False # Crossbar.io node directory # if hasattr(options, "cbdir"): if not options.cbdir: if "CROSSBAR_DIR" in os.environ: options.cbdir = os.environ["CROSSBAR_DIR"] elif os.path.isdir(".crossbar"): options.cbdir = ".crossbar" else: options.cbdir = "." options.cbdir = os.path.abspath(options.cbdir) # convenience: if --cbdir points to a config file, take # the config file's base dirname as node directory if os.path.isfile(options.cbdir): options.cbdir = os.path.dirname(options.cbdir) # convenience: auto-create directory if not existing if not os.path.isdir(options.cbdir): try: os.mkdir(options.cbdir) except Exception as e: print("Could not create node directory: {}".format(e)) sys.exit(1) else: print("Auto-created node directory {}".format(options.cbdir)) # Crossbar.io node configuration file # if hasattr(options, "config"): # if not explicit config filename is given, try to auto-detect . if not options.config: for f in ["config.yaml", "config.json"]: fn = os.path.join(options.cbdir, f) if os.path.isfile(fn) and os.access(fn, os.R_OK): options.config = f break # Log directory # if hasattr(options, "logdir"): if options.logdir: options.logdir = os.path.abspath(os.path.join(options.cbdir, options.logdir)) if not os.path.isdir(options.logdir): try: os.mkdir(options.logdir) except Exception as e: print("Could not create log directory: {}".format(e)) sys.exit(1) else: print("Auto-created log directory {}".format(options.logdir)) # Start the logger # _start_logging(options, reactor) term_print("CROSSBAR:LOGGING_STARTED") # run the subcommand selected # try: options.func(options, reactor=reactor, personality=personality) except SystemExit as e: # SystemExit(0) is okay! Anything other than that is bad and should be # re-raised. if e.args[0] != 0: raise