Source code for crossbar.node.native

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

from autobahn.twisted.websocket import WampWebSocketClientFactory, WampWebSocketClientProtocol
from twisted.internet.error import ConnectionDone, ProcessDone, ProcessTerminated
from txaio import make_logger

from crossbar._util import hltype

__all__ = ("create_native_worker_client_factory",)


class NativeWorkerClientProtocol(WampWebSocketClientProtocol):
    log = make_logger()

    def connectionMade(self):
        WampWebSocketClientProtocol.connectionMade(self)
        self._pid = self.transport.pid
        self.factory.proto = self

        # native workers are implicitly trusted
        self._authid = "crossbar.process.{}".format(self._pid)
        self._authrole = self.factory._authrole

        # the worker is actively spawned by the node controller,
        # and we talk over the pipes that were create during
        # process creation. this established implicit trust.
        self._authmethod = "trusted"

        # the trust is established implicitly by the way the
        # the client (worker) is created
        self._authprovider = "programcode"

    def connectionLost(self, reason):
        if isinstance(reason.value, ConnectionDone):
            self.log.info("Native worker connection closed cleanly.")
        else:
            self.log.warn("Native worker connection closed uncleanly: {reason}", reason=reason.value)

        WampWebSocketClientProtocol.connectionLost(self, reason)
        self.factory.proto = None

        if isinstance(reason.value, ProcessTerminated):
            if not self.factory._on_ready.called:
                # the worker was never ready in the first place ..
                self.factory._on_ready.errback(reason)
            else:
                # the worker _did_ run (was ready before), but now exited with error
                if not self.factory._on_exit.called:
                    self.factory._on_exit.errback(reason)
                else:
                    self.log.error(
                        "unhandled code path (1) in WorkerClientProtocol.connectionLost: {reason}", reason=reason.value
                    )
        elif isinstance(reason.value, (ProcessDone, ConnectionDone)):
            # the worker exited cleanly
            if not self.factory._on_exit.called:
                self.factory._on_exit.callback(None)
            else:
                self.log.error(
                    "unhandled code path (2) in WorkerClientProtocol.connectionLost: {reason}", reason=reason.value
                )
        else:
            # should not arrive here
            self.log.error(
                "unhandled code path (3) in WorkerClientProtocol.connectionLost: {reason}", reason=reason.value
            )


class NativeWorkerClientFactory(WampWebSocketClientFactory):
    log = make_logger()

    def __init__(self, *args, **kwargs):
        self.log.debug(
            "{func}(*args={_args}, **kwargs={_kwargs})",
            _args=args,
            _kwargs=kwargs,
            func=hltype(NativeWorkerClientFactory.__init__),
        )
        self._authrole = kwargs.pop("authrole")
        WampWebSocketClientFactory.__init__(self, *args, **kwargs)
        self.proto = None

    def buildProtocol(self, addr):
        self.proto = NativeWorkerClientProtocol()
        self.proto.factory = self
        return self.proto

    def stopFactory(self):
        WampWebSocketClientFactory.stopFactory(self)
        if self.proto:
            self.proto.close()
            # self.proto.transport.loseConnection()


[docs] def create_native_worker_client_factory(router_session_factory, authrole, on_ready, on_exit): """ Create a transport factory for talking to native workers. The node controller talks WAMP-WebSocket-over-STDIO with spawned (native) workers. The node controller runs a client transport factory, and the native worker runs a server transport factory. This is a little non-intuitive, but just the way that Twisted works when using STDIO transports. :param router_session_factory: Router session factory to attach to. :type router_session_factory: :class:`crossbar.router.session.RouterSessionFactory` """ factory = NativeWorkerClientFactory(router_session_factory, "ws://localhost", authrole=authrole) # we need to increase the opening handshake timeout in particular, since starting up a worker # on PyPy will take a little (due to JITting) factory.setProtocolOptions(failByDrop=False, openHandshakeTimeout=90, closeHandshakeTimeout=5) # on_ready is resolved in crossbar/node/process.py:on_worker_ready around 175 # after crossbar.node.<ID>.on_worker_ready is published to (in the controller session) # that happens in crossbar/worker/worker.py:publish_ready which itself happens when # the native worker joins the realm (part of onJoin) factory._on_ready = on_ready factory._on_exit = on_exit return factory