Source code for crossbar.bridge.mqtt.protocol

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import bitstring

from ._events import (
    ConnACK,
    Connect,
    Disconnect,
    Failure,
    ParseFailure,
    PingREQ,
    PingRESP,
    PubACK,
    PubCOMP,
    Publish,
    PubREC,
    PubREL,
    SubACK,
    Subscribe,
    UnsubACK,
    Unsubscribe,
)

__all__ = [
    "MQTTParser",
]


class _NeedMoreData(Exception):
    """
    We need more data before we can get the bytes length.
    """


# State machine events
WAITING_FOR_NEW_PACKET = 0
COLLECTING_REST_OF_PACKET = 1
PROTOCOL_VIOLATION = 2

P_CONNECT = 1
P_CONNACK = 2
P_PUBLISH = 3
P_PUBACK = 4
P_PUBREC = 5
P_PUBREL = 6
P_PUBCOMP = 7
P_SUBSCRIBE = 8
P_SUBACK = 9
P_UNSUBSCRIBE = 10
P_UNSUBACK = 11
P_PINGREQ = 12
P_PINGRESP = 13
P_DISCONNECT = 14

server_packet_handlers = {
    P_CONNECT: Connect,
    P_PUBLISH: Publish,
    P_PUBACK: PubACK,
    P_SUBSCRIBE: Subscribe,
    P_UNSUBSCRIBE: Unsubscribe,
    P_PINGREQ: PingREQ,
    P_PUBREL: PubREL,
    P_PUBREC: PubREC,
    P_PUBCOMP: PubCOMP,
    P_DISCONNECT: Disconnect,
}

client_packet_handlers = {
    P_CONNACK: ConnACK,
    P_PUBLISH: Publish,
    P_PUBACK: PubACK,
    P_SUBACK: SubACK,
    P_UNSUBACK: UnsubACK,
    P_PINGRESP: PingRESP,
    P_PUBREC: PubREC,
    P_PUBREL: PubREL,
    P_PUBCOMP: PubCOMP,
}


def _parse_header(data):
    # New packet
    packet_type = data.read("uint:4")
    flags = (data.read("bool"), data.read("bool"), data.read("bool"), data.read("bool"))

    multiplier = 1
    value = 0
    encodedByte = -1

    while (encodedByte & 128) != 0:
        try:
            encodedByte = data.read("uint:8")
        except bitstring.ReadError:
            # Not enough data yet, raise that up...
            raise _NeedMoreData()
        value += (encodedByte & 127) * multiplier
        multiplier = multiplier * 128

        if multiplier > (128 * 128 * 128):
            raise ParseFailure("Too big packet size")

    return (packet_type, flags, value)


[docs] class MQTTParser(object):
[docs] _packet_handlers = server_packet_handlers
[docs] _first_pkt = P_CONNECT
def __init__(self):
[docs] self._data = bitstring.BitStream()
[docs] self._bytes_expected = 0
[docs] self._state = WAITING_FOR_NEW_PACKET
[docs] self._packet_header = None
[docs] self._packet_count = 0
[docs] def data_received(self, data): if self._state is PROTOCOL_VIOLATION: # Conformance statement MQTT-4.8.0-1: Must close the connection on # a protocol violation. For us, if we keep getting data somehow # (e.g. flushed input buffers), just drop the data. return [] events = [] self._data.append(bitstring.BitArray(bytes=data)) self._data = self._data[self._data.bitpos :] while True: if self._state == WAITING_FOR_NEW_PACKET and len(self._data) > 8: try: self._packet_header = _parse_header(self._data) except _NeedMoreData: # Reset the data stream self._data.bitpos = 0 # Return the events we have return events except ParseFailure as e: events.append(Failure(e.args[0])) self._state = PROTOCOL_VIOLATION return events self._bytes_expected = self._packet_header[2] self._data = self._data[self._data.bitpos :] self._state = COLLECTING_REST_OF_PACKET elif self._state == COLLECTING_REST_OF_PACKET: self._data = self._data[self._data.bitpos :] if len(self._data) < self._bytes_expected * 8: return events else: self._data = self._data[self._data.bitpos :] return events if self._bytes_expected * 8 <= len(self._data): self._state = WAITING_FOR_NEW_PACKET packet_type, flags, value = self._packet_header if self._packet_count == 0 and packet_type != self._first_pkt: self._state = PROTOCOL_VIOLATION return [Failure("Connect packet was not first")] if self._packet_count > 0 and packet_type == self._first_pkt: events.append(Failure("Multiple Connect packets")) self._state = PROTOCOL_VIOLATION return events try: dataToGive = self._data.read(value * 8) if packet_type not in self._packet_handlers: self._state = PROTOCOL_VIOLATION events.append(Failure("Unimplemented packet type %d" % (packet_type,))) return events packet_handler = self._packet_handlers[packet_type] deser = packet_handler.deserialise(flags, dataToGive) events.append(deser) except ParseFailure as e: if len(e.args) == 1: events.append(Failure(e.args[0])) else: events.append(Failure(e.args[1] + " in " + e.args[0].__name__)) self._state = PROTOCOL_VIOLATION return events except bitstring.ReadError as e: # whoops the parsing fell off the amount of data events.append(Failure("Corrupt data, fell off the end: {}".format(e))) self._state = PROTOCOL_VIOLATION return events self._packet_header = None self._data = self._data[self._data.bitpos :] self._packet_count += 1 else: return events
class MQTTClientParser(MQTTParser): _first_pkt = P_CONNACK _packet_handlers = client_packet_handlers