Source code for crossbar.router.auth.tls
#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
from typing import Any, Dict
import txaio
from autobahn.wamp.types import Deny, TransportDetails
from txaio import make_logger
from crossbar.interfaces import IPendingAuth, IRealmContainer
from crossbar.router.auth.pending import PendingAuth
__all__ = ("PendingAuthTLS",)
[docs]
class PendingAuthTLS(PendingAuth):
"""
Pending WAMP-TLS authentication.
"""
def __init__(
self,
pending_session_id: int,
transport_details: TransportDetails,
realm_container: IRealmContainer,
config: Dict[str, Any],
):
super(PendingAuthTLS, self).__init__(
pending_session_id,
transport_details,
realm_container,
config,
)
# https://tools.ietf.org/html/rfc5056
# https://tools.ietf.org/html/rfc5929
# https://www.ietf.org/proceedings/90/slides/slides-90-uta-0.pdf
[docs]
self._channel_id = (
transport_details.channel_id.get("tls-unique", None) if transport_details.channel_id else None
)
[docs]
self._peer_cert = transport_details.peer_cert
# for static-mode, the config has principals as a dict indexed
# by authid, but we need the reverse map: cert-sha1 -> principal
[docs]
self._cert_sha1_to_principal = None
if self._config["type"] == "static":
self._cert_sha1_to_principal = {}
if "principals" in self._config:
for authid, principal in self._config["principals"].items():
self._cert_sha1_to_principal[principal["certificate-sha1"]] = {
"authid": authid,
"role": principal["role"],
}
[docs]
def hello(self, realm, details):
# we must have a client TLS certificate to continue
if not self._peer_cert:
return Deny(message="client did not send a TLS client certificate")
# remember the realm the client requested to join (if any)
self._realm = realm
# remember the authid the client wants to identify as (if any)
self._authid = details.authid
# use static principal database from configuration
if self._config["type"] == "static":
self._authprovider = "static"
client_cert_sha1 = self._peer_cert["sha1"]
if client_cert_sha1 in self._cert_sha1_to_principal:
principal = self._cert_sha1_to_principal[client_cert_sha1]
error = self._assign_principal(principal)
if error:
return error
return self._accept()
else:
return Deny(message='no principal with authid "{}" exists'.format(client_cert_sha1))
# use configured procedure to dynamically get a ticket for the principal
elif self._config["type"] == "dynamic":
self._authprovider = "dynamic"
init_d = txaio.as_future(self._init_dynamic_authenticator)
def init(result):
if result:
return result
self._session_details["authmethod"] = self._authmethod # from AUTHMETHOD, via base
self._session_details["authextra"] = details.authextra
d = self._authenticator_session.call(self._authenticator, realm, details.authid, self._session_details)
def on_authenticate_ok(_principal):
_error = self._assign_principal(_principal)
if _error:
return _error
# FIXME: not sure about this .. TLS is a transport-level auth mechanism .. so forward
# self._transport._authid = self._authid
# self._transport._authrole = self._authrole
# self._transport._authmethod = self._authmethod
# self._transport._authprovider = self._authprovider
# self._transport._authextra = self._authextra
return self._accept()
def on_authenticate_error(err):
return self._marshal_dynamic_authenticator_error(err)
d.addCallbacks(on_authenticate_ok, on_authenticate_error)
return d
init_d.addBoth(init)
return init_d
else:
# should not arrive here, as config errors should be caught earlier
return Deny(
message='invalid authentication configuration (authentication type "{}" is unknown)'.format(
self._config["type"]
)
)
[docs]
def authenticate(self, signature):
# should not arrive here!
raise Exception("internal error (WAMP-TLS does not implement AUTHENTICATE)")
IPendingAuth.register(PendingAuthTLS)