#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import txaio
txaio.use_twisted()
from collections import OrderedDict
from autobahn import util
from autobahn.twisted.util import peer2str
from autobahn.wamp import message, role
from autobahn.wamp.message import _URI_PAT_LOOSE_EMPTY, _URI_PAT_LOOSE_LAST_EMPTY, _URI_PAT_LOOSE_NON_EMPTY
from autobahn.wamp.serializer import (
CBORObjectSerializer,
JsonObjectSerializer,
MsgPackObjectSerializer,
UBJSONObjectSerializer,
)
from autobahn.wamp.types import TransportDetails
from autobahn.websocket.utf8validator import Utf8Validator
from pytrie import StringTrie
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue, succeed
from twisted.internet.interfaces import IHandshakeListener, ISSLTransport
from twisted.internet.protocol import Factory, Protocol
from txaio import make_logger
from zope.interface import implementer
from crossbar.bridge.mqtt.tx import MQTTServerTwistedProtocol
from crossbar.router.session import RouterSession
[docs]
_validator = Utf8Validator()
[docs]
def _mqtt_topicfilter_to_wamp(topic):
"""
Convert a MQTT topic as used in MQTT Subscribe (and hence ptoentially containing
special characters "+" and "#") to a WAMP URI and a match policy.
"""
if not isinstance(topic, str):
raise TypeError('invalid type "{}" for MQTT topic filter'.format(type(topic)))
if "+" in topic:
# check topic filter containing single-level wildcard character
# this is a restriction following from WAMP! we cannot have both
# wildcard and prefix matching combined.
if "#" in topic:
raise TypeError(
'MQTT topic filter "{}" contains both single-level and multi-level wildcards, and this cannot be mapped to WAMP'.format(
topic
)
)
for c in topic.split("/"):
if c != "+" and "+" in c:
raise TypeError(
'invalid MQTT filter "{}": single-level wildcard characters must stand on their own in components'.format(
topic
)
)
_match = "wildcard"
_topic = topic.replace("+", "")
elif "#" in topic:
# check topic filter containing multi-level wildcard character
# there can be only one occurence, and it must be at the end
if topic.find("#") != len(topic) - 1:
raise TypeError(
'invalid MQTT topic filter "{}": # multi-level wildcard character must only appear as last character'.format(
topic
)
)
if len(topic) > 1:
if topic[-2] != "/":
raise TypeError(
'invalid MQTT topic filter "{}": # multi-level wildcard character must either appear solely, or be preceded by a / topic separator character'.format(
topic
)
)
_match = "prefix"
_topic = topic[:-1]
else:
_match = "exact"
_topic = topic[:]
# MQTT spec 4.7.1.1: "The use of the topic level separator is significant when either
# of the two wildcard characters is encountered in the Topic Filters specified by subscribing Clients."
#
# FIXME: However, we still cannot leave the "/" character untouched and uninterpreted
# when no "+" or "#" was encountered
if True or _match != "exact":
# replace MQTT level separator "/" with WAMP level separator "."
_topic = ".".join(_topic.split("/"))
if (
(_match == "exact" and not _URI_PAT_LOOSE_NON_EMPTY.match(_topic))
or (_match == "prefix" and not _URI_PAT_LOOSE_LAST_EMPTY.match(_topic))
or (_match == "wildcard" and not _URI_PAT_LOOSE_EMPTY.match(_topic))
):
raise TypeError(
'invalid WAMP URI "{}" (match="{}") after conversion from MQTT topic filter "{}"'.format(
_topic, _match, topic
)
)
return _topic, _match
[docs]
def _mqtt_topicname_to_wamp(topic):
"""
Convert a MQTT topic as used in MQTT Publish to a WAMP URI.
"""
if not isinstance(topic, str):
raise TypeError('invalid type "{}" for MQTT topic name'.format(type(topic)))
if "#" in topic or "+" in topic:
raise TypeError('invalid MQTT topic name "{}" - contains wildcard characters'.format(topic))
if "/" in topic:
_topic = ".".join(topic.split("/"))
else:
_topic = topic
if not _URI_PAT_LOOSE_NON_EMPTY.match(_topic):
raise TypeError('invalid WAMP URI "{}" after conversion from MQTT topic name "{}"'.format(_topic, topic))
return _topic
[docs]
def _wamp_topic_to_mqtt(topic):
"""
Convert a WAMP URI as used in WAMP Publish to a MQTT topic.
"""
return "/".join(topic.split("."))
[docs]
class WampTransport(object):
def __init__(self, factory, on_message, real_transport):
[docs]
self.on_message = on_message
[docs]
self.transport = real_transport
[docs]
self.transport_details = TransportDetails()
real_transport._transport_config = {"foo": 32}
[docs]
def send(self, msg):
self.on_message(msg)
@implementer(IHandshakeListener)
[docs]
class WampMQTTServerProtocol(Protocol):
def __init__(self, reactor):
[docs]
self._mqtt = MQTTServerTwistedProtocol(self, reactor)
[docs]
self._request_to_packetid = {}
[docs]
self._waiting_for_connect = None
[docs]
self._inflight_subscriptions = {}
[docs]
self._subrequest_to_mqtt_subrequest = {}
[docs]
self._subrequest_callbacks = {}
[docs]
self._topic_lookup = {}
[docs]
self._wamp_session = None
[docs]
def on_message(self, inc_msg):
try:
self._on_message(inc_msg)
except:
self.log.failure()
@inlineCallbacks
[docs]
def _on_message(self, inc_msg):
self.log.debug("WampMQTTServerProtocol._on_message(inc_msg={inc_msg})", inc_msg=inc_msg)
if isinstance(inc_msg, message.Challenge):
assert inc_msg.method == "ticket"
msg = message.Authenticate(signature=self._pw_challenge)
del self._pw_challenge
self._wamp_session.onMessage(msg)
elif isinstance(inc_msg, message.Welcome):
self._waiting_for_connect.callback((0, False))
elif isinstance(inc_msg, message.Abort):
self._waiting_for_connect.callback((1, False))
elif isinstance(inc_msg, message.Subscribed):
# Successful subscription!
mqtt_id = self._subrequest_to_mqtt_subrequest[inc_msg.request]
self._inflight_subscriptions[mqtt_id][inc_msg.request]["response"] = 0
self._topic_lookup[inc_msg.subscription] = self._inflight_subscriptions[mqtt_id][inc_msg.request]["topic"]
if -1 not in [x["response"] for x in self._inflight_subscriptions[mqtt_id].values()]:
self._subrequest_callbacks[mqtt_id].callback(None)
elif isinstance(inc_msg, message.Error) and inc_msg.request_type == message.Subscribe.MESSAGE_TYPE:
# Failed subscription :(
mqtt_id = self._subrequest_to_mqtt_subrequest[inc_msg.request]
self._inflight_subscriptions[mqtt_id][inc_msg.request]["response"] = 128
if -1 not in [x["response"] for x in self._inflight_subscriptions[mqtt_id].values()]:
self._subrequest_callbacks[mqtt_id].callback(None)
elif isinstance(inc_msg, message.Event):
topic = inc_msg.topic or self._topic_lookup[inc_msg.subscription]
try:
payload_format, mapped_topic, payload = yield self.factory.transform_wamp(topic, inc_msg)
except:
self.log.failure()
else:
self._mqtt.send_publish(mapped_topic, 0, payload, retained=inc_msg.retained or False)
elif isinstance(inc_msg, message.Goodbye):
if self._mqtt.transport:
self._mqtt.transport.loseConnection()
self._mqtt.transport = None
else:
self.log.warn("cannot process unimplemented message: {inc_msg}", inc_msg=inc_msg)
[docs]
def connectionMade(self, ignore_handshake=False):
if ignore_handshake or not ISSLTransport.providedBy(self.transport):
self._when_ready()
[docs]
def connectionLost(self, reason):
if self._wamp_session:
msg = message.Goodbye()
self._wamp_session.onMessage(msg)
del self._wamp_session
[docs]
def handshakeCompleted(self):
self._when_ready()
[docs]
def _when_ready(self):
if self._wamp_session:
return
self._mqtt.transport = self.transport
self._wamp_session = RouterSession(self.factory._router_session_factory._routerFactory)
self._wamp_transport = WampTransport(self.factory, self.on_message, self.transport)
self._wamp_session.onOpen(self._wamp_transport)
self._wamp_session._transport_config = self.factory._options
[docs]
def process_connect(self, packet):
"""
Process the initial Connect message from the MQTT client.
This should return a pair `(accept_conn, session_present)`, where
`accept_conn` is a return code:
0: connection accepted
1-5: connection refused (see MQTT spec 3.2.2.3)
"""
# Connect(client_id='paho/4E23D8C09DD9C6CF2C',
# flags=ConnectFlags(username=False,
# password=False,
# will=False,
# will_retain=False,
# will_qos=0,
# clean_session=True,
# reserved=False),
# keep_alive=60,
# will_topic=None,
# will_message=None,
# username=None,
# password=None)
self.log.info("WampMQTTServerProtocol.process_connect(packet={packet})", packet=packet)
# we don't support session resumption: https://github.com/crossbario/crossbar/issues/892
if not packet.flags.clean_session:
self.log.warn(
"denying MQTT connect from {peer}, as the clients wants to resume a session (which we do not support)",
peer=peer2str(self.transport.getPeer()),
)
return succeed((1, False))
# we won't support QoS 2: https://github.com/crossbario/crossbar/issues/1046
if packet.flags.will and packet.flags.will_qos not in [0, 1]:
self.log.warn(
'denying MQTT connect from {peer}, as the clients wants to provide a "last will" event with QoS {will_qos} (and we only support QoS 0/1 here)',
peer=peer2str(self.transport.getPeer()),
will_qos=packet.flags.will_qos,
)
return succeed((1, False))
# this will be resolved when the MQTT connect handshake is completed
self._waiting_for_connect = Deferred()
roles = {
"subscriber": role.RoleSubscriberFeatures(payload_transparency=True, pattern_based_subscription=True),
"publisher": role.RolePublisherFeatures(payload_transparency=True, x_acknowledged_event_delivery=True),
}
realm = self.factory._options.get("realm", None)
authmethods = []
authextra = {
"mqtt": {"client_id": packet.client_id, "will": bool(packet.flags.will), "will_topic": packet.will_topic}
}
if ISSLTransport.providedBy(self.transport):
authmethods.append("tls")
if packet.username and packet.password:
authmethods.append("ticket")
msg = message.Hello(
realm=realm, roles=roles, authmethods=authmethods, authid=packet.username, authextra=authextra
)
self._pw_challenge = packet.password
else:
authmethods.append("anonymous")
msg = message.Hello(
realm=realm, roles=roles, authmethods=authmethods, authid=packet.client_id, authextra=authextra
)
self._wamp_session.onMessage(msg)
if packet.flags.will:
# it's unclear from the MQTT spec whether a) the publication of the last will
# is to happen in-band during "connect", and if it fails, deny the connection,
# or b) the last will publication happens _after_ "connect", and the connection
# succeeds regardless whether the last will publication succeeds or not.
#
# we opt for b) here!
#
@inlineCallbacks
@self._waiting_for_connect.addCallback
def process_will(res):
self.log.info()
payload_format, mapped_topic, options = yield self.factory.transform_mqtt(
packet.will_topic, packet.will_message
)
request = util.id()
msg = message.Call(
request=request,
procedure="wamp.session.add_testament",
args=[
mapped_topic,
options.get("args", None),
options.get("kwargs", None),
{
# specifiy "retain" for when the testament (last will)
# will be auto-published by the broker later
"retain": bool(packet.flags.will_retain)
},
],
)
self._wamp_session.onMessage(msg)
returnValue(res)
return self._waiting_for_connect
@inlineCallbacks
[docs]
def _publish(self, event, acknowledge=None):
"""
Given a MQTT event, create a WAMP Publish message and
forward that on the forwarding WAMP session.
"""
try:
payload_format, mapped_topic, options = yield self.factory.transform_mqtt(event.topic_name, event.payload)
except:
self.log.failure()
return
request = util.id()
msg = message.Publish(
request=request,
topic=mapped_topic,
exclude_me=False,
acknowledge=acknowledge,
retain=event.retain,
**options,
)
self._wamp_session.onMessage(msg)
if event.qos_level > 0:
self._request_to_packetid[request] = event.packet_identifier
returnValue(0)
[docs]
def process_publish_qos_0(self, event):
try:
return self._publish(event)
except:
self.log.failure()
[docs]
def process_publish_qos_1(self, event):
try:
return self._publish(event, acknowledge=True)
except:
self.log.failure()
[docs]
def process_puback(self, event):
return
[docs]
def process_pubrec(self, event):
return
[docs]
def process_pubrel(self, event):
return
[docs]
def process_pubcomp(self, event):
return
[docs]
def process_subscribe(self, packet):
packet_watch = OrderedDict()
d = Deferred()
@d.addCallback
def _(ign):
self._mqtt.send_suback(packet.packet_identifier, [x["response"] for x in packet_watch.values()])
del self._inflight_subscriptions[packet.packet_identifier]
del self._subrequest_callbacks[packet.packet_identifier]
self._subrequest_callbacks[packet.packet_identifier] = d
self._inflight_subscriptions[packet.packet_identifier] = packet_watch
for n, x in enumerate(packet.topic_requests):
topic, match = _mqtt_topicfilter_to_wamp(x.topic_filter)
self.log.info("process_subscribe -> topic={topic}, match={match}", topic=topic, match=match)
request_id = util.id()
msg = message.Subscribe(
request=request_id,
topic=topic,
match=match,
get_retained=True,
)
try:
packet_watch[request_id] = {"response": -1, "topic": x.topic_filter}
self._subrequest_to_mqtt_subrequest[request_id] = packet.packet_identifier
self._wamp_session.onMessage(msg)
except:
self.log.failure()
packet_watch[request_id] = {"response": 128}
@inlineCallbacks
[docs]
def process_unsubscribe(self, packet):
for topic in packet.topics:
if topic in self._subscriptions:
yield self._subscriptions.pop(topic).unsubscribe()
return
[docs]
def dataReceived(self, data):
self._mqtt.dataReceived(data)
[docs]
class WampMQTTServerFactory(Factory):
[docs]
protocol = WampMQTTServerProtocol
[docs]
serializers = {
"json": JsonObjectSerializer(),
"msgpack": MsgPackObjectSerializer(),
"cbor": CBORObjectSerializer(),
"ubjson": UBJSONObjectSerializer(),
}
def __init__(self, router_session_factory, config, reactor):
[docs]
self._router_session_factory = router_session_factory
[docs]
self._router_factory = router_session_factory._routerFactory
[docs]
self._options = config.get("options", {})
[docs]
self._realm = self._options.get("realm", None)
[docs]
self._reactor = reactor
[docs]
self._payload_mapping = StringTrie()
for topic, pmap in self._options.get("payload_mapping", {}).items():
self._set_payload_format(topic, pmap)
[docs]
def buildProtocol(self, addr):
protocol = self.protocol(self._reactor)
protocol.factory = self
return protocol
@inlineCallbacks
@inlineCallbacks
@inlineCallbacks
@inlineCallbacks