#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import argparse
import binascii
import importlib
import cbor2
from twisted.internet.error import ReactorNotRunning
from crossbar._util import _add_debug_options, hl, hlid, hltype, term_print
try:
import vmprof
_HAS_VMPROF = True
except ImportError:
_HAS_VMPROF = False
__all__ = ("_run_command_exec_worker",)
def get_argument_parser(parser=None):
if not parser:
parser = argparse.ArgumentParser()
_add_debug_options(parser)
parser.add_argument(
"--reactor",
default=None,
choices=["select", "poll", "epoll", "kqueue", "iocp"],
help="Explicit Twisted reactor selection (optional).",
)
parser.add_argument(
"--loglevel",
default="info",
choices=["none", "error", "warn", "info", "debug", "trace"],
help="Initial log level.",
)
parser.add_argument("-c", "--cbdir", type=str, required=True, help="Crossbar.io node directory (required).")
parser.add_argument(
"-r", "--realm", type=str, required=True, help="Crossbar.io node (management) realm (required)."
)
parser.add_argument(
"-p",
"--personality",
required=True,
type=str,
help='Crossbar.io personality _class_ name, eg "crossbar.personality.Personality" (required).',
)
parser.add_argument(
"-k",
"--klass",
required=True,
type=str,
help='Crossbar.io worker class, eg "crossbar.worker.container.ContainerController" (required).',
)
parser.add_argument("-n", "--node", required=True, type=str, help="Crossbar.io node ID (required).")
parser.add_argument("-w", "--worker", type=str, required=True, help="Crossbar.io worker ID (required).")
parser.add_argument(
"-e",
"--extra",
type=str,
required=False,
help="Crossbar.io worker extra configuration from worker.options.extra (optional).",
)
parser.add_argument("--title", type=str, default=None, help="Worker process title to set (optional).")
parser.add_argument(
"--expose_controller",
type=bool,
default=False,
help="Expose node controller session to all components (this feature requires crossbar).",
)
parser.add_argument(
"--expose_shared",
type=bool,
default=False,
help="Expose a shared object to all components (this feature requires crossbar).",
)
parser.add_argument("--shutdown", type=str, default=None, help="Shutdown method")
parser.add_argument("--restart", type=str, default=None, help="Restart method")
if _HAS_VMPROF:
parser.add_argument(
"--vmprof", action="store_true", help="Profile node controller and native worker using vmprof."
)
return parser
[docs]
def _run_command_exec_worker(options, reactor=None, personality=None):
"""
Entry point into (native) worker processes. This wires up stuff such that
a worker instance is talking WAMP-over-stdio to the node controller.
"""
import os
import platform
import signal
import sys
# https://coverage.readthedocs.io/en/coverage-4.4.2/subprocess.html#measuring-sub-processes
MEASURING_COVERAGE = False
if "COVERAGE_PROCESS_START" in os.environ:
try:
import coverage
except ImportError:
pass
else:
# The following will read the environment variable COVERAGE_PROCESS_START,
# and that should be set to the .coveragerc file:
#
# export COVERAGE_PROCESS_START=${PWD}/.coveragerc
#
coverage.process_startup()
MEASURING_COVERAGE = True
# we use an Autobahn utility to import the "best" available Twisted reactor
from autobahn.twisted.choosereactor import install_reactor
reactor = install_reactor(explicit_reactor=options.reactor or os.environ.get("CROSSBAR_REACTOR", None))
# make sure logging to something else than stdio is setup _first_
from twisted.logger import globalLogPublisher
from twisted.python.reflect import qual
from txaio import make_logger, start_logging
from crossbar._logging import cb_logging_aware, make_JSON_observer
log = make_logger()
# Print a magic phrase that tells the capturing logger that it supports
# Crossbar's rich logging
print(cb_logging_aware, file=sys.__stderr__)
sys.__stderr__.flush()
flo = make_JSON_observer(sys.__stderr__)
globalLogPublisher.addObserver(flo)
term_print("CROSSBAR[{}]:WORKER_STARTING".format(options.worker))
# Ignore SIGINT so we get consistent behavior on control-C versus
# sending SIGINT to the controller process. When the controller is
# shutting down, it sends TERM to all its children but ctrl-C
# handling will send a SIGINT to all the processes in the group
# (so then the controller sends a TERM but the child already or
# will very shortly get a SIGINT as well). Twisted installs signal
# handlers, but not for SIGINT if there's already a custom one
# present.
def ignore(sig, frame):
log.debug("Ignoring SIGINT in worker.")
signal.signal(signal.SIGINT, ignore)
# actually begin logging
start_logging(None, options.loglevel)
# get personality klass, eg "crossbar.personality.Personality"
l = options.personality.split(".")
personality_module, personality_klass = ".".join(l[:-1]), l[-1]
# now load the personality module and class
_mod = importlib.import_module(personality_module)
Personality = getattr(_mod, personality_klass)
# get worker klass, eg "crossbar.worker.container.ContainerController"
l = options.klass.split(".")
worker_module, worker_klass = ".".join(l[:-1]), l[-1]
# now load the worker module and class
_mod = importlib.import_module(worker_module)
klass = getattr(_mod, worker_klass)
log.info(
'Starting {worker_type}-worker "{worker_id}" on node "{node_id}" (personality "{personality}") and local node management realm "{realm}" .. {worker_class}',
worker_type=hl(klass.WORKER_TYPE),
worker_id=hlid(options.worker),
node_id=hlid(options.node),
realm=hlid(options.realm),
personality=hl(Personality.NAME),
worker_class=hltype(klass),
)
log.info(
"Running as PID {pid} on {python}-{reactor}",
pid=os.getpid(),
python=platform.python_implementation(),
reactor=qual(reactor.__class__).split(".")[-1],
)
if MEASURING_COVERAGE:
log.info(
hl("Code coverage measurements enabled (coverage={coverage_version}).", color="green", bold=True),
coverage_version=coverage.__version__,
)
# set process title if requested to
#
try:
import setproctitle
except ImportError:
log.debug("Could not set worker process title (setproctitle not installed)")
else:
if options.title:
setproctitle.setproctitle(options.title)
else:
setproctitle.setproctitle("crossbar-worker [{}]".format(options.klass))
# node directory
#
options.cbdir = os.path.abspath(options.cbdir)
os.chdir(options.cbdir)
# log.msg("Starting from node directory {}".format(options.cbdir))
# set process title if requested to
#
try:
import setproctitle
except ImportError:
log.debug("Could not set worker process title (setproctitle not installed)")
else:
if options.title:
setproctitle.setproctitle(options.title)
else:
setproctitle.setproctitle("crossbar-worker [{}]".format(options.klass))
from autobahn.twisted.websocket import WampWebSocketServerProtocol
from twisted.internet.error import ConnectionDone
class WorkerServerProtocol(WampWebSocketServerProtocol):
def connectionLost(self, reason):
# the behavior here differs slightly whether we're shutting down orderly
# or shutting down because of "issues"
if isinstance(reason.value, ConnectionDone):
was_clean = True
else:
was_clean = False
try:
# this log message is unlikely to reach the controller (unless
# only stdin/stdout pipes were lost, but not stderr)
if was_clean:
log.info("Connection to node controller closed cleanly")
else:
log.warn("Connection to node controller lost: {reason}", reason=reason)
# give the WAMP transport a change to do it's thing
WampWebSocketServerProtocol.connectionLost(self, reason)
except:
# we're in the process of shutting down .. so ignore ..
pass
finally:
# after the connection to the node controller is gone,
# the worker is "orphane", and should exit
# determine process exit code
if was_clean:
exit_code = 0
else:
exit_code = 1
# exit the whole worker process when the reactor has stopped
reactor.addSystemEventTrigger("after", "shutdown", os._exit, exit_code)
# stop the reactor
try:
reactor.stop()
except ReactorNotRunning:
pass
# if vmprof global profiling is enabled via command line option, this will carry
# the file where vmprof writes its profile data
if _HAS_VMPROF:
_vm_prof = {
# need to put this into a dict, since FDs are ints, and python closures can't
# write to this otherwise
"outfd": None
}
# https://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IReactorCore.html
# Each "system event" in Twisted, such as 'startup', 'shutdown', and 'persist', has 3 phases:
# 'before', 'during', and 'after' (in that order, of course). These events will be fired
# internally by the Reactor.
def before_reactor_started():
term_print("CROSSBAR[{}]:REACTOR_STARTING".format(options.worker))
def after_reactor_started():
term_print("CROSSBAR[{}]:REACTOR_STARTED".format(options.worker))
if _HAS_VMPROF and options.vmprof:
outfn = os.path.join(options.cbdir, ".vmprof-worker-{}-{}.dat".format(options.worker, os.getpid()))
_vm_prof["outfd"] = os.open(outfn, os.O_RDWR | os.O_CREAT | os.O_TRUNC)
vmprof.enable(_vm_prof["outfd"], period=0.01)
term_print("CROSSBAR[{}]:VMPROF_ENABLED:{}".format(options.worker, outfn))
def before_reactor_stopped():
term_print("CROSSBAR[{}]:REACTOR_STOPPING".format(options.worker))
if _HAS_VMPROF and options.vmprof and _vm_prof["outfd"]:
vmprof.disable()
term_print("CROSSBAR[{}]:VMPROF_DISABLED".format(options.worker))
def after_reactor_stopped():
# FIXME: we are indeed reaching this line, however,
# the log output does not work (it also doesnt work using
# plain old print). Dunno why.
# my theory about this issue is: by the time this line
# is reached, Twisted has already closed the stdout/stderr
# pipes. hence we do an evil trick: we directly write to
# the process' controlling terminal
# https://unix.stackexchange.com/a/91716/52500
term_print("CROSSBAR[{}]:REACTOR_STOPPED".format(options.worker))
reactor.addSystemEventTrigger("before", "startup", before_reactor_started)
reactor.addSystemEventTrigger("after", "startup", after_reactor_started)
reactor.addSystemEventTrigger("before", "shutdown", before_reactor_stopped)
reactor.addSystemEventTrigger("after", "shutdown", after_reactor_stopped)
try:
# define a WAMP application session factory
#
from autobahn.wamp.types import ComponentConfig
def make_session():
session_config = ComponentConfig(realm=options.realm, extra=options)
session = klass(config=session_config, reactor=reactor, personality=Personality)
return session
# create a WAMP-over-WebSocket transport server factory
#
from autobahn.twisted.websocket import WampWebSocketServerFactory
transport_factory = WampWebSocketServerFactory(make_session, "ws://localhost")
transport_factory.protocol = WorkerServerProtocol
transport_factory.setProtocolOptions(failByDrop=False)
# create a protocol instance and wire up to stdio
#
from twisted.internet import stdio
from twisted.python.runtime import platform as _platform
proto = transport_factory.buildProtocol(None)
if _platform.isWindows():
stdio.StandardIO(proto)
else:
stdio.StandardIO(proto, stdout=3)
# now start reactor loop
#
log.info(hl("Entering event reactor ...", color="green", bold=True))
reactor.run()
except Exception as e:
log.info("Unhandled exception: {e}", e=e)
if reactor.running:
reactor.addSystemEventTrigger("after", "shutdown", os._exit, 1)
reactor.stop()
else:
sys.exit(1)
if __name__ == "__main__":
import sys
from crossbar import _util
_args = sys.argv[1:]
_util.set_flags_from_args(_args)
parser = get_argument_parser()
args = parser.parse_args(_args)
if args.extra:
args.extra = cbor2.loads(binascii.a2b_hex(args.extra))
_run_command_exec_worker(args)