Source code for crossbar.edge.worker.monitor._disk

##############################################################################
#
#                        Crossbar.io
#     Copyright (C) typedef int GmbH. All rights reserved.
#
##############################################################################

from autobahn.util import utcnow

from crossbar.edge.worker.monitor._base import Monitor

__all__ = ("IOMonitor",)


[docs] class IOMonitor(Monitor): """ IO monitoring. This is using Linux procfs to get measurements. """
[docs] ID = "diskio"
def __init__(self, config=None): Monitor.__init__(self, config)
[docs] self._storage = self._config.get("storage", [])
# flat list of block devices #
[docs] self._devices = []
for subsystem in self._storage: for device, _ in subsystem["devices"]: self._devices.append(device) # map indexed by device holding last raw (cumulative) values
[docs] self._last = {}
# map indexed by device holding last values (since previous sample)
[docs] self._values = {}
for device in self._devices: self._last[device] = None self._values[device] = None
[docs] def poll(self): """ Measure current stats value and return new stats. """ Monitor.poll(self) # create new, empty event # current = { # the UTC timestamp when measurement was taken "timestamp": utcnow(), # the effective last period in secods "last_period": self._last_period, # storage subsystem measurements "subsystems": [], } # normalize with effective period diff = self._last_period or 1.0 # get IO stats per device from procfs (/sys/block/<device>/stat) # see: https://www.kernel.org/doc/Documentation/block/stat.txt # for device in self._devices: with open("/sys/block/{}/stat".format(device)) as fd: res = fd.read() new = [int(s.strip()) for s in res.split()] if not self._last[device]: self._last[device] = new last = self._last[device] self._values[device] = { "read_ios": int((new[0] - last[0]) / diff), "read_merges": int((new[1] - last[1]) / diff), "read_bytes": int(512 * (new[2] - last[2]) / diff), "read_ticks": int((new[3] - last[3]) / diff), "write_ios": int((new[4] - last[4]) / diff), "write_merges": int((new[5] - last[5]) / diff), "write_bytes": int(512 * (new[6] - last[6]) / diff), "write_ticks": int((new[7] - last[7]) / diff), "in_flight": new[8], "io_ticks": int((new[9] - last[9]) / diff), "time_in_queue": int((new[10] - last[10]) / diff), } self._last[device] = new # transform raw measurements into target event structure # for subsys in self._storage: subsystem = {"id": subsys["id"], "devices": []} for device_id, device_label in subsys["devices"]: values = self._values[device_id] device = { "id": device_id, "type": device_label, "read_ios": values["read_ios"], "read_bytes": values["read_bytes"], "read_ms": values["read_ticks"], "write_ios": values["write_ios"], "write_bytes": values["write_bytes"], "write_ms": values["write_ticks"], "in_flight": values["in_flight"], "active_ms": values["io_ticks"], "wait_ms": values["time_in_queue"], } subsystem["devices"].append(device) current["subsystems"].append(subsystem) self._last_value = current return self._last_value