#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import binascii
import os
from pprint import pformat
from typing import Any, Dict, Optional, Union
import nacl
import txaio
import web3
from autobahn import util
from autobahn.util import hlid, hltype, hlval
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.message import identify_realm_name_category
from autobahn.wamp.types import Accept, Challenge, Deny, HelloDetails, TransportDetails
from nacl.exceptions import BadSignatureError
from nacl.signing import VerifyKey
from twisted.internet.defer import Deferred
from xbr import EIP712AuthorityCertificate, parse_certificate_chain
from crossbar.interfaces import IPendingAuth, IRealmContainer
from crossbar.router.auth.pending import PendingAuth
__all__ = (
"PendingAuthCryptosign",
"PendingAuthCryptosignProxy",
)
[docs]
class PendingAuthCryptosign(PendingAuth):
"""
Pending Cryptosign authentication.
"""
[docs]
log = txaio.make_logger()
[docs]
AUTHMETHOD = "cryptosign"
def __init__(
self,
pending_session_id: int,
transport_details: TransportDetails,
realm_container: IRealmContainer,
config: Dict[str, Any],
):
super(PendingAuthCryptosign, self).__init__(
pending_session_id,
transport_details,
realm_container,
config,
)
# https://tools.ietf.org/html/rfc5056
# https://tools.ietf.org/html/rfc5929
# https://www.ietf.org/proceedings/90/slides/slides-90-uta-0.pdf
[docs]
self._channel_id = (
transport_details.channel_id.get("tls-unique", None) if transport_details.channel_id else None
)
[docs]
self._verify_key: Optional[VerifyKey] = None
[docs]
self._challenge: Optional[bytes] = None
[docs]
self._expected_signed_message: Optional[bytes] = None
# map `pubkey -> authid` from `config['principals']`, this is to allow clients to
# authenticate without specifying an authid
[docs]
self._pubkey_to_authid = None
# map `realm_name -> trustroot` from `config['trustroots']`
[docs]
self._realms_to_trustroots = None
if self._config["type"] == "static":
if "principals" in self._config:
self._pubkey_to_authid = {}
for authid, principal in self._config.get("principals", {}).items():
for pubkey in principal["authorized_keys"]:
self._pubkey_to_authid[pubkey] = authid
self.log.info(
"{func} using principals ({pubkeys_cnt} pubkeys loaded)",
pubkeys_cnt=hlval(len(self._pubkey_to_authid), color="green"),
func=hltype(PendingAuthCryptosign.__init__),
)
elif "trustroots" in self._config:
self._realms_to_trustroots = {}
for _realm, _trustroot in self._config["trustroots"].items():
_realm_category = identify_realm_name_category(_realm)
if _realm_category == "standalone":
_trustroot_fn = os.path.join(
self._realm_container.config.extra.cbdir, _trustroot["certificate"]
) # noqa
self._realms_to_trustroots[_realm] = EIP712AuthorityCertificate.load(_trustroot_fn)
elif _realm_category in ["eth", "ens", "reverse_ens"]:
assert False, "FIXME-" * 10
else:
assert False, 'invalid realm name "{}" in static cryptosign configuration'.format(_realm)
else:
assert False, (
'neither "principals" nor "trustroots" attribute found in static cryptosign configuration'
)
[docs]
def _compute_challenge(self, requested_channel_binding: Optional[str]) -> Dict[str, Any]:
self._challenge = os.urandom(32)
if self._channel_id and requested_channel_binding == "tls-unique":
self._expected_signed_message = util.xor(self._challenge, self._channel_id)
else:
self._expected_signed_message = self._challenge
extra = {
"challenge": binascii.b2a_hex(self._challenge).decode(),
"channel_binding": requested_channel_binding,
}
self.log.info(
"{func}::_compute_challenge(channel_binding={channel_binding})[channel_id={channel_id}] -> extra=\n{extra}",
func=hltype(self.hello),
channel_id=hlid("0x" + binascii.b2a_hex(self._channel_id).decode()) if self._channel_id else None,
channel_binding=hlval('"' + requested_channel_binding + '"')
if requested_channel_binding is not None
else None,
extra=pformat(extra),
)
return extra
[docs]
def hello(self, realm: str, details: HelloDetails) -> Union[Accept, Deny, Challenge]:
self.log.info(
'{func}::hello(realm="{realm}", details.authid="{authid}", details.authrole="{authrole}", '
'details.authextra="{authextra}")',
func=hltype(self.hello),
realm=hlid(realm),
authid=hlid(details.authid),
authrole=hlid(details.authrole),
authextra=details.authextra,
)
# the channel binding requested by the client authenticating
requested_channel_binding = details.authextra.get("channel_binding", None) if details.authextra else None
if requested_channel_binding is not None and requested_channel_binding not in ["tls-unique"]:
return Deny(message='invalid channel binding type "{}" requested'.format(requested_channel_binding))
else:
self.log.info(
"WAMP-cryptosign CHANNEL BINDING requested: channel_binding={channel_binding}, channel_id={channel_id}",
channel_binding=requested_channel_binding,
channel_id=self._channel_id,
)
# remember the realm the client requested to join (if any)
self._realm = realm
# remember the authid the client wants to identify as (if any)
self._authid = details.authid
# get trustroot presented by the client
client_trustroot = details.authextra.get("trustroot", None) if details.authextra else None
if client_trustroot:
client_trustroot_name_category = identify_realm_name_category(client_trustroot)
if client_trustroot_name_category not in ["eth", "ens", "reverse_ens"]:
return Deny(message='invalid client trustroot "{}" provided'.format(client_trustroot))
self.log.info(
'{func} using client trustroot {trustroot_name_category} "{trustroot}" from client HELLO',
trustroot=hlid(client_trustroot),
trustroot_name_category=hlval(client_trustroot_name_category, color="green"),
func=hltype(self.hello),
)
# get certificates presented by the client
client_certificates = details.authextra.get("certificates", None) if details.authextra else None
if client_certificates:
if not isinstance(client_certificates, list):
return Deny(message="invalid type {} for client certificates".format(type(client_certificates)))
for cc_i, cc_and_sig in enumerate(client_certificates):
cc_hash, cc, cc_sig = cc_and_sig
if not isinstance(cc, dict):
return Deny(
message="invalid type {} for certificate {} in client certificates".format(type(cc), cc_i)
)
client_certificates = parse_certificate_chain(client_certificates)
self.log.info(
"{func} using {cnt_cc} client certificates from client HELLO:\n{client_certificates}",
cnt_cc=hlval(len(client_certificates), color="green"),
client_certificates=client_certificates,
func=hltype(self.hello),
)
if self._config["type"] == "static":
self._authprovider = "static"
# get client's pubkey, if it was provided in authextra
pubkey = None
if details.authextra and "pubkey" in details.authextra:
pubkey = details.authextra["pubkey"]
# use trustroots configured (if trustroots are indeed configured and the client provided
# a certificate chain to verify)
if client_certificates and realm in self._realms_to_trustroots:
# root CA configured as trustroot for realm
root_ca_cert = self._realms_to_trustroots[realm]
if client_trustroot != web3.Web3.toChecksumAddress(root_ca_cert.issuer):
return Deny(
message='trustroot {} provided by client for realm "{}" does not match '
"root CA with issuer {} configured for that realm".format(
client_trustroot, realm, web3.Web3.toChecksumAddress(root_ca_cert.issuer)
)
)
# trustroot to consider is the issuer of the last certificate (the root CA cert) in
# the certificate chain provided by the client
trustroot = client_certificates[-1].issuer
if trustroot != root_ca_cert.issuer:
return Deny(
message="trustroot {} provided by client in last certificate of certificate chain does not match "
'root CA with issuer {} configured for realm "{}"'.format(
web3.Web3.toChecksumAddress(trustroot),
realm,
web3.Web3.toChecksumAddress(root_ca_cert.issuer),
)
)
principal = {
# use delegate address as synthetic authid
"authid": web3.Web3.toChecksumAddress(client_certificates[0].delegate),
# FIXME: the authrole somehow needs to be configurable
"role": "user",
}
# use static principal database from configuration
elif "principals" in self._config:
# if the client provides its public key, that's enough to identify,
# and we can infer the authid from that. BUT: that requires that
# there is a 1:1 relation between authids and pubkeys !! see below (*)
if self._authid is None:
if pubkey:
# we do a naive search, but that is ok, since "static mode" is from
# node configuration, and won't contain a lot principals anyway
for _authid, _principal in self._config.get("principals", {}).items():
if pubkey in _principal["authorized_keys"]:
# (*): this is necessary to detect multiple authid's having the same pubkey
# in which case we couldn't reliably map the authid from the pubkey
if self._authid is None:
self._authid = _authid
else:
return Deny(
message="cannot infer client identity from pubkey: multiple authids "
"in principal database have this pubkey"
)
if self._authid is None:
return Deny(
message="cannot identify client: no authid requested and no principal found "
"for provided extra.pubkey"
)
else:
return Deny(message="cannot identify client: no authid requested and no extra.pubkey provided")
principals = self._config.get("principals", {})
if self._authid in principals:
principal = principals[self._authid]
if pubkey and (pubkey not in principal["authorized_keys"]):
self.log.warn(
'extra.pubkey {pubkey} provided does not match any one of authorized_keys for the principal [func="{func}"]:\n{principals}',
func=hltype(self.hello),
realm=hlid(realm),
authid=hlid(details.authid),
pubkey=hlval(pubkey),
principals=pformat(principals),
)
return Deny(
message="extra.pubkey provided does not match any one of authorized_keys for the principal"
)
else:
self.log.warn(
'no principal with authid "{authid}" exists in principals for realm "{realm}" [func="{func}"]:\n{principals}',
func=hltype(self.hello),
realm=hlid(realm),
authid=hlid(self._authid),
principals=pformat(principals),
)
return Deny(message='no principal with authid "{}" exists'.format(self._authid))
# neither "principals" nor "trustroots" configured
else:
return Deny(
message='neither "principals", nor "trustroots" configured (and client certificates provided)'
)
error = self._assign_principal(principal)
if error:
return error
self._verify_key = VerifyKey(pubkey, encoder=nacl.encoding.HexEncoder)
extra = self._compute_challenge(requested_channel_binding)
if "challenge" in details.authextra and details.authextra["challenge"]:
challenge_raw = binascii.a2b_hex(details.authextra["challenge"])
if requested_channel_binding == "tls-unique":
data = util.xor(challenge_raw, self._channel_id)
else:
data = challenge_raw
# sign the client challenge with our node private Ed25519 key on node controller
signature_d = self._realm_container.get_controller_session().call("crossbar.sign", data)
def _on_sign_ok(signature):
# return the concatenation of the signature and the message signed (96 bytes)
extra["signature"] = binascii.b2a_hex(signature).decode() + binascii.b2a_hex(data).decode()
signature_d.addCallback(_on_sign_ok)
# get node public key from node controller
pubkey_d = self._realm_container.get_controller_session().call("crossbar.get_public_key")
def _on_pubkey_ok(pubkey):
# return router public key
extra["pubkey"] = pubkey
pubkey_d.addCallback(_on_pubkey_ok)
# FIXME: add router certificate
# FIXME: add router trustroot
d = txaio.gather([signature_d, pubkey_d])
def _on_final(_):
return Challenge(self._authmethod, extra)
d.addCallback(_on_final)
return d
else:
return Challenge(self._authmethod, extra)
elif self._config["type"] == "dynamic":
self._authprovider = "dynamic"
d = Deferred()
d1 = txaio.as_future(self._init_dynamic_authenticator)
def initialized(error=None):
if error:
d.errback(error)
return
self._session_details["authmethod"] = self._authmethod # from AUTHMETHOD, via base
self._session_details["authid"] = details.authid
self._session_details["authrole"] = details.authrole
self._session_details["authextra"] = details.authextra
self.log.debug(
'Calling dynamic authenticator [proc="{proc}", realm="{realm}", session={session}, authid="{authid}", authrole="{authrole}"]',
proc=self._authenticator,
realm=self._authenticator_session._realm,
session=self._authenticator_session._session_id,
authid=self._authenticator_session._authid,
authrole=self._authenticator_session._authrole,
)
d2 = self._authenticator_session.call(
self._authenticator, realm, details.authid, self._session_details
)
def on_authenticate_ok(principal):
self.log.debug(
'{klass}.hello(realm="{realm}", details={details}) -> on_authenticate_ok(principal={principal})',
klass=self.__class__.__name__,
realm=realm,
details=details,
principal=principal,
)
_error = self._assign_principal(principal)
if _error:
d.callback(_error)
return
self._verify_key = VerifyKey(principal["pubkey"], encoder=nacl.encoding.HexEncoder)
extra = self._compute_challenge(requested_channel_binding)
challenge = Challenge(self._authmethod, extra)
d.callback(challenge)
def on_authenticate_error(_error):
self.log.debug(
'{klass}.hello(realm="{realm}", details={details}) -> on_authenticate_error(error={error})',
klass=self.__class__.__name__,
realm=realm,
details=details,
error=_error,
)
try:
d.callback(self._marshal_dynamic_authenticator_error(_error))
except:
self.log.failure()
d.callback(_error)
d2.addCallbacks(on_authenticate_ok, on_authenticate_error)
return d2
def initialized_error(fail):
self.log.failure("Internal error (3): {log_failure.value}", failure=fail)
d.errback(fail)
d1.addCallbacks(initialized, initialized_error)
return d
elif self._config["type"] == "function":
self._authprovider = "function"
init_d = txaio.as_future(self._init_function_authenticator)
def init(_error):
if _error:
return _error
self._session_details["authmethod"] = self._authmethod # from AUTHMETHOD, via base
self._session_details["authid"] = details.authid
self._session_details["authrole"] = details.authrole
self._session_details["authextra"] = details.authextra
auth_d = txaio.as_future(self._authenticator, realm, details.authid, self._session_details)
def on_authenticate_ok(principal):
self.log.debug(
'{klass}.hello(realm="{realm}", details={details}) -> on_authenticate_ok(principal={principal})',
klass=self.__class__.__name__,
realm=realm,
details=details,
principal=principal,
)
_error = self._assign_principal(principal)
if _error:
return _error
self._verify_key = VerifyKey(principal["pubkey"], encoder=nacl.encoding.HexEncoder)
_extra = self._compute_challenge(requested_channel_binding)
return Challenge(self._authmethod, _extra)
def on_authenticate_error(err):
self.log.warn(
'{klass}.hello(realm="{realm}", details={details}) -> on_authenticate_error(err={err})',
klass=self.__class__.__name__,
realm=realm,
details=details,
err=err,
)
try:
return self._marshal_dynamic_authenticator_error(err)
except Exception as e:
_error = ApplicationError.AUTHENTICATION_FAILED
message = "marshalling of function-based authenticator error return failed: {}".format(e)
self.log.warn("{klass}.hello.on_authenticate_error() - {msg}", msg=message)
return Deny(_error, message)
auth_d.addCallbacks(on_authenticate_ok, on_authenticate_error)
return auth_d
init_d.addBoth(init)
return init_d
else:
# should not arrive here, as config errors should be caught earlier
return Deny(
message='invalid authentication configuration (authentication type "{}" is unknown)'.format(
self._config["type"]
)
)
[docs]
def authenticate(self, signature: str) -> Union[Accept, Deny]:
"""
Verify the signed message sent by the client. With WAMP-cryptosign, this must be 96 bytes (as a string
in HEX encoding): the concatenation of the Ed25519 signature (64 bytes) and the 32 bytes we sent
as a challenge previously, XORed with the 32 bytes transport channel ID (if available).
"""
try:
if not isinstance(signature, str):
return Deny(message="invalid type {} for signed message".format(type(signature)))
try:
signed_message = binascii.a2b_hex(signature)
except TypeError:
return Deny(message="signed message is invalid (not a HEX encoded string)")
if len(signed_message) != 96:
return Deny(
message="signed message has invalid length (was {}, but should have been 96)".format(
len(signed_message)
)
)
# now verify the signed message versus the client public key
assert self._verify_key
try:
message = self._verify_key.verify(signed_message)
except BadSignatureError:
return Deny(message="signed message has invalid signature")
# and check that the message signed by the client is really what we expect
assert self._expected_signed_message
if message != self._expected_signed_message:
return Deny(
message="message signed is bogus [got 0x{}, expected 0x{}]".format(
binascii.b2a_hex(message).decode(), binascii.b2a_hex(self._expected_signed_message).decode()
)
)
# signature was valid _and_ the message that was signed is equal to
# what we expected => accept the client
return self._accept()
# should not arrive here, but who knows
except Exception as e:
self.log.failure()
return Deny(message="INTERNAL ERROR ({})".format(e))
[docs]
class PendingAuthCryptosignProxy(PendingAuthCryptosign):
"""
Pending Cryptosign authentication with additions for proxy
"""
[docs]
log = txaio.make_logger()
[docs]
AUTHMETHOD = "cryptosign-proxy"
[docs]
def hello(self, realm, details):
self.log.debug(
"{klass}.hello(realm={realm}, details={details}) ...",
klass=self.__class__.__name__,
realm=realm,
details=details,
)
if not details.authextra:
return Deny(message="missing required details.authextra")
for attr in ["proxy_authid", "proxy_authrole", "proxy_realm"]:
if attr not in details.authextra:
return Deny(message="missing required attribute {} in details.authextra".format(attr))
if details.authrole is None:
details.authrole = details.authextra.get("proxy_authrole", None)
if details.authid is None:
details.authid = details.authextra.get("proxy_authid", None)
# with authenticators of type "*-proxy", the principal returned in authenticating the
# incoming backend connection is ignored ..
f = txaio.as_future(super(PendingAuthCryptosignProxy, self).hello, realm, details)
def assign(res):
if isinstance(res, Deny):
return res
# the incoming backend connection from the proxy frontend is authenticated as the principal
# the frontend proxy has _already_ authenticated the actual client (before even connecting and
# authenticating to the backend here)
principal = {
"realm": details.authextra["proxy_realm"],
"authid": details.authextra["proxy_authid"],
"role": details.authextra["proxy_authrole"],
# the authextra intended for the principal is forwarded from the proxy
"extra": details.authextra.get("proxy_authextra", None),
}
self._assign_principal(principal)
self.log.debug(
"{klass}.hello(realm={realm}, details={details}) -> principal={principal}",
klass=self.__class__.__name__,
realm=realm,
details=details,
principal=principal,
)
return self._accept()
def error(_err):
return Deny("Internal error: {}".format(_err))
txaio.add_callbacks(f, assign, error)
return f
IPendingAuth.register(PendingAuthCryptosign)