Source code for crossbar.worker
#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import importlib
import sys
from autobahn.twisted.wamp import ApplicationSession
from autobahn.wamp.exception import ApplicationError
from twisted.python.failure import Failure
from txaio import make_logger
#
# the imports here are important (though not used in CB unless configured),
# because of single-exe packaging and pyinstaller otherwise missing deps
#
from crossbar.worker.sample import LogTester # noqa
[docs]
def _appsession_loader(config):
"""
Load a class from C{config}.
"""
log = make_logger()
if config["type"] == "class":
try:
klassname = config["classname"]
log.debug("Starting class '{klass}'", klass=klassname)
c = klassname.split(".")
module_name, klass_name = ".".join(c[:-1]), c[-1]
module = importlib.import_module(module_name)
component = getattr(module, klass_name)
if not issubclass(component, ApplicationSession):
raise ApplicationError(
"crossbar.error.class_import_failed", "session not derived of ApplicationSession"
)
except Exception:
emsg = "Failed to import class '{}'\n{}".format(klassname, Failure().getTraceback())
log.debug(emsg)
log.debug("PYTHONPATH: {pythonpath}", pythonpath=sys.path)
raise ApplicationError("crossbar.error.class_import_failed", emsg, pythonpath=sys.path)
elif config["type"] == "function":
callbacks = {}
for name, funcref in config.get("callbacks", {}).items():
if "." not in funcref:
raise ApplicationError(
"crossbar.error",
"no '.' in callback reference '{}'".format(funcref),
)
try:
package, func = funcref.rsplit(".", 1)
module = importlib.import_module(package)
callbacks[name] = getattr(module, func)
except Exception:
emsg = "Failed to import package '{}' (for '{}')\n{}".format(
package, funcref, Failure().getTraceback()
)
log.error("{msg}", msg=emsg)
raise ApplicationError("crossbar.error.class_import_failed", emsg)
# while the "component" callback is usually an
# ApplicationSession class, it can be anything that takes a
# "config" arg (must return an ApplicationSession instance)
def component(cfg):
session = _AnonymousRoleSession(cfg)
session.role = config.get("role", "anonymous")
for name, fn in callbacks.items():
session.on(name, fn)
return session
else:
raise ApplicationError(
"crossbar.error.invalid_configuration", "invalid component type '{}'".format(config["type"])
)
return component
[docs]
class _AnonymousRoleSession(ApplicationSession):
[docs]
def onConnect(self):
self.join(self.config.realm, authrole=self.role)