Source code for crossbar.bridge.mqtt._utils
#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
from autobahn.websocket.utf8validator import Utf8Validator
from bitstring import pack
[docs]
_validator = Utf8Validator()
[docs]
class ParseFailure(Exception):
pass
[docs]
class SerialisationFailure(Exception):
pass
[docs]
def read_prefixed_data(data):
"""
Reads the next 16-bit-uint prefixed data block from `data`.
"""
data_length = data.read("uint:16")
return data.read(data_length * 8).bytes
[docs]
def read_string(data):
"""
Reads the next MQTT pascal-style string from `data`.
"""
byte_data = read_prefixed_data(data)
_validator.reset()
if _validator.validate(byte_data)[0]:
decoded = byte_data.decode("utf8", "strict")
if "\u0000" in decoded:
raise ParseFailure("Invalid UTF-8 string (contains nulls)")
return decoded
else:
raise ParseFailure("Invalid UTF-8 string (contains surrogates)")
[docs]
def build_string(string):
string = string.encode("utf8")
return pack("uint:16", len(string)).bytes + string
[docs]
def iterbytes(b):
for i in range(len(b)):
yield b[i : i + 1]