#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import json
from typing import Any, Dict, Union
import txaio
from autobahn import util
from autobahn.util import hltype, hlval
from autobahn.wamp import auth
from autobahn.wamp.types import Accept, Challenge, Deny, HelloDetails, TransportDetails
from txaio import make_logger
from crossbar.interfaces import IPendingAuth, IRealmContainer
from crossbar.router.auth.pending import PendingAuth
__all__ = ("PendingAuthWampCra",)
[docs]
class PendingAuthWampCra(PendingAuth):
"""
Pending WAMP-CRA authentication.
"""
def __init__(
self,
pending_session_id: int,
transport_details: TransportDetails,
realm_container: IRealmContainer,
config: Dict[str, Any],
):
super(PendingAuthWampCra, self).__init__(
pending_session_id,
transport_details,
realm_container,
config,
)
# filled with what the client requested originally (if any)
# The signature we expect the client to send in AUTHENTICATE.
[docs]
def _compute_challenge(self, user):
"""
Returns: challenge, signature
"""
challenge_obj = {
"authid": self._authid,
"authrole": self._authrole,
"authmethod": self._authmethod,
"authprovider": self._authprovider,
"session": self._session_details["session"],
"nonce": util.newid(64),
"timestamp": util.utcnow(),
}
challenge: str = json.dumps(challenge_obj, ensure_ascii=False)
secret = user["secret"].encode("utf8")
# extra data to send to client in CHALLENGE
extra = {"challenge": challenge}
# when using salted passwords, provide the client with
# the salt and then PBKDF2 parameters used
if "salt" in user and "iterations" in user and "keylen" in user:
extra["salt"] = user["salt"]
extra["iterations"] = user["iterations"]
extra["keylen"] = user["keylen"]
secret = auth.derive_key(secret, extra["salt"], extra["iterations"], extra["keylen"])
signature = auth.compute_wcs(secret, challenge.encode("utf8")).decode("ascii")
return extra, signature
[docs]
def hello(self, realm: str, details: HelloDetails) -> Union[Accept, Deny, Challenge]:
# remember the realm the client requested to join (if any)
self._realm = realm
# remember the authid the client wants to identify as (if any)
self._authid = details.authid
# define local helpers
if self._config["type"] in ["dynamic", "function"]:
def on_authenticate_ok(_principal):
_error = self._assign_principal(_principal)
if _error:
return _error
# now compute CHALLENGE.Extra and signature expected
_extra, self._signature = self._compute_challenge(_principal)
return Challenge(self._authmethod, _extra)
def on_authenticate_error(err):
return self._marshal_dynamic_authenticator_error(err)
# use static principal database from configuration
if self._config["type"] == "static":
self._authprovider = "static"
if self._authid in self._config.get("users", {}):
principal = self._config["users"][self._authid]
error = self._assign_principal(principal)
if error:
return error
# now compute CHALLENGE.Extra and signature as
# expected for WAMP-CRA
extra, self._signature = self._compute_challenge(principal)
return Challenge(self._authmethod, extra)
else:
return Deny(message='no principal with authid "{}" exists'.format(details.authid))
# use configured procedure to dynamically get a ticket for the principal
elif self._config["type"] == "dynamic":
self._authprovider = "dynamic"
init_d = txaio.as_future(self._init_dynamic_authenticator)
def init(result):
if result:
return result
self._session_details["authmethod"] = self._authmethod # from AUTHMETHOD, via base
self._session_details["authid"] = details.authid
self._session_details["authrole"] = details.authrole
self._session_details["authextra"] = details.authextra
d = self._authenticator_session.call(self._authenticator, realm, details.authid, self._session_details)
d.addCallbacks(on_authenticate_ok, on_authenticate_error)
return d
init_d.addBoth(init)
return init_d
elif self._config["type"] == "function":
self._authprovider = "function"
init_d = txaio.as_future(self._init_function_authenticator)
def init(result):
if result:
return result
self._session_details["authmethod"] = self._authmethod # from AUTHMETHOD, via base
self._session_details["authid"] = details.authid
self._session_details["authrole"] = details.authrole
self._session_details["authextra"] = details.authextra
auth_d = txaio.as_future(self._authenticator, realm, details.authid, self._session_details)
auth_d.addCallbacks(on_authenticate_ok, on_authenticate_error)
return auth_d
init_d.addBoth(init)
return init_d
else:
# should not arrive here, as config errors should be caught earlier
return Deny(
message='invalid authentication configuration (authentication type "{}" is unknown)'.format(
self._config["type"]
)
)
[docs]
def authenticate(self, signature: str) -> Union[Accept, Deny]:
if signature == self._signature:
# signature was valid: accept the client
return self._accept()
else:
# signature was invalid: deny the client
self.log.warn(
"{func}: WAMP-CRA client signature is invalid (expected {expected} but got {signature})",
func=hltype(self.authenticate),
expected=hlval(self._signature),
signature=hlval(signature, color="red"),
)
return Deny(message="WAMP-CRA client signature is invalid")
IPendingAuth.register(PendingAuthWampCra)