Source code for crossbar.node.guest
#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import json
from twisted.internet import protocol
from twisted.internet.error import ConnectionDone, ProcessDone, ProcessExitedAlready, ProcessTerminated
from txaio import make_logger
__all__ = ("create_guest_worker_client_factory",)
class GuestWorkerClientProtocol(protocol.Protocol):
log = make_logger()
def __init__(self, config):
self.config = config
def connectionMade(self):
# `self.transport` is now a provider of `twisted.internet.interfaces.IProcessTransport`
# see: http://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IProcessTransport.html
options = self.config.get("options", {})
self.log.debug("GuestWorkerClientProtocol.connectionMade")
if "stdout" in options and options["stdout"] == "close":
self.transport.closeStdout()
self.log.debug("GuestWorkerClientProtocol: stdout to guest closed")
if "stderr" in options and options["stderr"] == "close":
self.transport.closeStderr()
self.log.debug("GuestWorkerClientProtocol: stderr to guest closed")
if "stdin" in options:
if options["stdin"] == "close":
self.transport.closeStdin()
self.log.debug("GuestWorkerClientProtocol: stdin to guest closed")
else:
if options["stdin"]["type"] == "json":
self.transport.write(json.dumps(options["stdin"]["value"], ensure_ascii=False).encode("utf8"))
self.log.debug("GuestWorkerClientProtocol: JSON value written to stdin on guest")
else:
raise Exception("logic error")
if options["stdin"].get("close", True):
self.transport.closeStdin()
self.log.debug("GuestWorkerClientProtocol: stdin to guest closed")
self.factory._on_ready.callback(self)
def connectionLost(self, reason):
self.log.debug("GuestWorkerClientProtocol.connectionLost: {reason}", reason=reason)
try:
if isinstance(reason.value, (ProcessDone, ConnectionDone)):
self.log.debug("GuestWorkerClientProtocol: guest ended cleanly")
self.factory._on_exit.callback(None)
elif isinstance(reason.value, ProcessTerminated):
self.log.debug("GuestWorkerClientProtocol: guest ended with error {code}", code=reason.value.exitCode)
self.factory._on_exit.errback(reason)
else:
# get this when subprocess has exited early; should we just log error?
# should not arrive here
self.log.error(
"GuestWorkerClientProtocol: INTERNAL ERROR - should not arrive here - {reason}",
reason=reason,
)
except Exception:
self.log.failure("GuestWorkerClientProtocol: INTERNAL ERROR - {log_failure}")
def signal(self, sig="TERM"):
assert sig in ["KILL", "TERM", "INT"]
try:
self.transport.signalProcess(sig)
except ProcessExitedAlready:
pass
except OSError:
self.log.failure(None)
class GuestWorkerClientFactory(protocol.Factory):
def __init__(self, config, on_ready, on_exit):
self.proto = None
self._config = config
self._on_ready = on_ready
self._on_exit = on_exit
def buildProtocol(self, addr):
self.proto = GuestWorkerClientProtocol(self._config)
self.proto.factory = self
return self.proto
def signal(self, sig="TERM"):
assert sig in ["KILL", "TERM", "INT"]
if self.proto:
self.proto.signal(sig)
[docs]
def create_guest_worker_client_factory(config, on_ready, on_exit):
factory = GuestWorkerClientFactory(config, on_ready, on_exit)
return factory