Source code for crossbar.common.twisted.processutil
#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
from twisted.internet import defer
from twisted.internet.address import _ProcessAddress
from twisted.internet.endpoints import ProcessEndpoint, _WrapIProtocol
from twisted.python.runtime import platform
__all__ = ("WorkerProcessEndpoint",)
if platform.isWindows():
# On Windows, we're only using FDs 0, 1, and 2.
class _WorkerWrapIProtocol(_WrapIProtocol): # type: ignore
"""
Wraps an IProtocol into an IProcessProtocol which forwards data
received on Worker._log_fds to WorkerProcess.log().
"""
def childDataReceived(self, childFD, data):
"""
Some data has come in from the process child. If it's one of our
log FDs ([2]), log it. Otherwise, let _WrapIProtocol deal with it.
"""
# track bytes received per child FD
self._worker.track_stats(childFD, len(data))
if childFD in self._worker._log_fds:
self._worker.log(childFD, data)
else:
_WrapIProtocol.childDataReceived(self, childFD, data)
else:
# On UNIX-likes, we're logging FD1/2, and using FD3 for our own
# communication.
class _WorkerWrapIProtocol(_WrapIProtocol): # type: ignore
"""
Wraps an IProtocol into an IProcessProtocol which forwards data
received on Worker._log_fds to WorkerProcess.log().
"""
def childDataReceived(self, childFD, data):
"""
Some data has come in from the process child. If it's one of our
log FDs ([1, 2]), log it. If it's on FD3, send it to the WAMP connection.
Otherwise, let _WrapIProtocol deal with it.
"""
# track bytes received per child FD
self._worker.track_stats(childFD, len(data))
if childFD in self._worker._log_fds:
self._worker.log(childFD, data)
elif childFD == 3:
self.protocol.dataReceived(data)
else:
_WrapIProtocol.childDataReceived(self, childFD, data)
[docs]
class WorkerProcessEndpoint(ProcessEndpoint):
"""
A custom process endpoint for workers.
:see: http://twistedmatrix.com/documents/current/api/twisted.internet.endpoints.ProcessEndpoint.html
"""
def __init__(self, *args, **kwargs):
"""
Ctor.
:param worker: The worker this endpoint is being used for.
:type worker: instance of WorkerProcess
"""
[docs]
self._worker = kwargs.pop("worker")
ProcessEndpoint.__init__(self, *args, **kwargs)
[docs]
def connect(self, protocolFactory):
"""
See base class.
"""
proto = protocolFactory.buildProtocol(_ProcessAddress())
try:
wrapped = _WorkerWrapIProtocol(proto, self._executable, self._errFlag)
wrapped._worker = self._worker
self._spawnProcess(
wrapped,
self._executable,
self._args,
self._env,
self._path,
self._uid,
self._gid,
self._usePTY,
self._childFDs,
)
except:
return defer.fail()
else:
return defer.succeed(proto)