###############################################################################
#
# Crossbar.io Shell
# Copyright (c) typedef int GmbH. Licensed under EUPLv1.2.
#
###############################################################################
import argparse
import binascii
import os
import sys
import txaio
from autobahn.twisted.wamp import ApplicationRunner, ApplicationSession
from autobahn.wamp import cryptosign
from autobahn.wamp.exception import ApplicationError
from autobahn.websocket.util import parse_url
__all__ = ("ShellClient", "ManagementClientSession", "run", "create_management_session")
from twisted.internet.defer import Deferred
from crossbar.shell.command import CmdListManagementRealms
[docs]
class ShellClient(ApplicationSession):
"""
Management CLI application session.
"""
[docs]
def onConnect(self): # noqa: N802
self.log.info("{klass}.onConnect()", klass=self.__class__.__name__)
self._key = self.config.extra["key"]
self._command = self.config.extra.get("command", None)
self._main = self.config.extra.get("main", None)
# authentication extra information for wamp-cryptosign
#
extra = {
# forward the client pubkey: this allows us to omit authid as
# the router can identify us with the pubkey already
"pubkey": self._key.public_key(),
# not yet implemented. a public key the router should provide
# a trustchain for it's public key. the trustroot can eg be
# hard-coded in the client, or come from a command line option.
"trustroot": self.config.extra.get("trustroot", None),
# not yet implemented. for authenticating the router, this
# challenge will need to be signed by the router and send back
# in AUTHENTICATE for client to verify. A string with a hex
# encoded 32 bytes random value.
"challenge": self.config.extra.get("challenge", None),
# use TLS channel binding
"channel_binding": self.config.extra.get("channel_binding", None),
}
# used for user login/registration activation code
for k in ["activation_code", "request_new_activation_code"]:
if k in self.config.extra and self.config.extra[k]:
extra[k] = self.config.extra[k]
# now request to join ..
self.join(
self.config.realm,
authmethods=["cryptosign"],
authid=self.config.extra.get("authid", None),
authrole=self.config.extra.get("authrole", None),
authextra=extra,
)
[docs]
def onChallenge(self, challenge): # noqa: N802
self.log.info("{klass}.onChallenge(challenge={challenge})", klass=self.__class__.__name__, challenge=challenge)
# sign and send back the challenge with our private key.
try:
channel_id_type = self.config.extra.get("channel_binding", None)
channel_id = self.transport.transport_details.channel_id.get(channel_id_type, None)
sig = self._key.sign_challenge(challenge, channel_id=channel_id, channel_id_type=channel_id_type)
except Exception as e:
self.log.failure()
self.leave(ApplicationError.AUTHENTICATION_FAILED, str(e))
else:
return sig
[docs]
async def onJoin(self, details): # noqa: N802
self.log.info("{klass}.onJoin(details={details})", klass=self.__class__.__name__, details=details)
done = self.config.extra.get("done", None)
result = None
error = None
if self._command:
self.log.info("{klass}: running command {command}", klass=self.__class__.__name__, command=self._command)
try:
result = await self._command.run(self)
self.log.info("command run with result {result}", result=result)
except Exception as e:
self.log.warn("command failed: {error}", error=e)
error = e
elif self._main:
self.log.info("{klass}: running main function {main}", klass=self.__class__.__name__, main=self._main)
try:
result = await self._main(self)
self.log.info("main run with result {result}", result=result)
except Exception as e:
self.log.warn("main failed: {error}", error=e)
error = e
else:
self.log.info("{klass}: no command or main function to run!", klass=self.__class__.__name__)
if done and not txaio.is_called(done):
if error:
self.log.warn(
"{klass}: command returned with error ({error})", klass=self.__class__.__name__, error=error
)
txaio.reject(done, error)
else:
self.log.info(
"{klass}: command returned with success ({result})", klass=self.__class__.__name__, result=result
)
txaio.resolve(done, (details, result))
self.log.info("{klass}.onJoin(): finished!", klass=self.__class__.__name__)
if self._main:
self.leave()
[docs]
def onLeave(self, details): # noqa: N802
self.log.info("{klass}.onLeave(details={details})", klass=self.__class__.__name__, details=details)
# reason=<wamp.error.authentication_failed>
if details.reason != "wamp.close.normal":
done = self.config.extra.get("done", None)
error = ApplicationError(details.reason, details.message)
if done and not txaio.is_called(done):
self.log.warn(
"{klass}: command returned with error ({error})", klass=self.__class__.__name__, error=error
)
txaio.reject(done, error)
self.disconnect()
[docs]
def onDisconnect(self): # noqa: N802
self.log.info("{klass}.onDisconnect()", klass=self.__class__.__name__)
if self._main:
try:
self.config.runner.stop()
self.disconnect()
except:
self.log.failure()
from twisted.internet import reactor
if reactor.running:
reactor.stop()
[docs]
class ManagementClientSession(ApplicationSession):
[docs]
def onConnect(self):
self._key = self.config.extra["key"]
extra = {
"pubkey": self._key.public_key(),
"trustroot": self.config.extra.get("trustroot", None),
"challenge": self.config.extra.get("challenge", None),
"channel_binding": self.config.extra.get("channel_binding", None),
}
for k in ["activation_code", "request_new_activation_code"]:
if k in self.config.extra and self.config.extra[k]:
extra[k] = self.config.extra[k]
self.join(
self.config.realm,
authmethods=["cryptosign"],
authid=self.config.extra.get("authid", None),
authrole=self.config.extra.get("authrole", None),
authextra=extra,
)
[docs]
def onChallenge(self, challenge):
channel_id_type = self.config.extra.get("channel_binding", None)
channel_id = self.transport.transport_details.channel_id.get(channel_id_type, None)
sig = self._key.sign_challenge(challenge, channel_id=channel_id, channel_id_type=channel_id_type)
return sig
[docs]
def onJoin(self, details):
if "ready" in self.config.extra:
self.config.extra["ready"].callback((self, details))
[docs]
def onLeave(self, reason):
self.disconnect()
[docs]
def create_management_session(
url="wss://master.xbr.network/ws", realm="com.crossbario.fabric", privkey_file="default.priv"
):
txaio.start_logging(level="info")
privkey_file = os.path.join(os.path.expanduser("~/.crossbar"), privkey_file)
# for authenticating the management client, we need a Ed25519 public/private key pair
# here, we are reusing the user key - so this needs to exist before
privkey_hex = None
user_id = None
if not os.path.exists(privkey_file):
raise Exception("private key file {} does not exist".format(privkey_file))
else:
with open(privkey_file, "r") as f:
data = f.read()
for line in data.splitlines():
if line.startswith("private-key-ed25519"):
privkey_hex = line.split(":")[1].strip()
if line.startswith("user-id"):
user_id = line.split(":")[1].strip()
if privkey_hex is None:
raise Exception("no private key found in keyfile!")
if user_id is None:
raise Exception("no user ID found in keyfile!")
url_is_secure, _, _, _, _, _ = parse_url(url)
key = cryptosign.CryptosignKey.from_bytes(binascii.a2b_hex(privkey_hex))
extra = {
"key": key,
"authid": user_id,
"ready": Deferred(),
"return_code": None,
"command": CmdListManagementRealms(),
# WAMP-cryptosign TLS channel binding
"channel_binding": "tls-unique" if url_is_secure else None,
}
runner = ApplicationRunner(url=url, realm=realm, extra=extra)
runner.run(ManagementClientSession, start_reactor=False)
return extra["ready"]
[docs]
def run(main=None, parser=None):
# parse command line arguments
parser = parser or argparse.ArgumentParser()
parser.add_argument(
"--debug", dest="debug", action="store_true", default=False, help='Enable logging at level "debug".'
)
parser.add_argument(
"--url",
dest="url",
type=str,
default="wss://master.xbr.network/ws",
help="Management service of the XBR Network(default: wss://master.xbr.network/ws",
)
parser.add_argument(
"--realm", dest="realm", type=str, default=None, help="The (management) realm to join on the management server"
)
parser.add_argument(
"--keyfile",
dest="keyfile",
type=str,
default=None,
help="The private client key file to use for authentication.",
)
parser.add_argument(
"--authmethod",
dest="authmethod",
type=str,
default="cryptosign",
help="Authentication method: cryptosign or anonymous",
)
args = parser.parse_args()
if args.debug:
txaio.start_logging(level="debug")
else:
txaio.start_logging(level="info")
args.keyfile = os.path.abspath(os.path.expanduser(args.keyfile))
print("usering keyfile from", args.keyfile)
extra = None
if args.authmethod == "cryptosign":
# for authenticating the management client, we need a Ed25519 public/private key pair
# here, we are reusing the user key - so this needs to exist before
privkey_file = os.path.expanduser(args.keyfile)
privkey_hex = None
user_id = None
if not os.path.exists(privkey_file):
raise Exception("private key file {} does not exist".format(privkey_file))
else:
with open(privkey_file, "r") as f:
data = f.read()
for line in data.splitlines():
if line.startswith("private-key-ed25519"):
privkey_hex = line.split(":")[1].strip()
if line.startswith("user-id"):
user_id = line.split(":")[1].strip()
if privkey_hex is None:
raise Exception("no private key found in keyfile!")
if user_id is None:
raise Exception("no user ID found in keyfile!")
key = cryptosign.CryptosignKey.from_bytes(binascii.a2b_hex(privkey_hex))
extra = {"args": args, "key": key, "authid": user_id, "main": main, "return_code": None}
elif args.authmethod == "anonymous":
extra = {"args": args, "main": main, "return_code": None}
else:
raise Exception("logic error")
runner = ApplicationRunner(url=args.url, realm=args.realm, extra=extra)
runner.run(ShellClient)
return_code = extra["return_code"]
if isinstance(return_code, int) and return_code != 0:
sys.exit(return_code)
if __name__ == "__main__":
run()