Source code for crossbar.edge.worker.monitor._storage

##############################################################################
#
#                        Crossbar.io
#     Copyright (C) typedef int GmbH. All rights reserved.
#
##############################################################################

import psutil
from txaio import perf_counter_ns

from crossbar.edge.worker.monitor._base import Monitor

__all__ = ("StorageMonitor",)


[docs] class StorageMonitor(Monitor): """ Storage and disk IO monitoring via psutils. """
[docs] ID = "storage"
[docs] def poll(self): """ Measure current stats value and return new stats. """ hdata = Monitor.poll(self) start = perf_counter_ns() devices = {} usage = {} counters = psutil.disk_io_counters(True) for dev in psutil.disk_partitions(): if dev.device.startswith("/dev/loop"): continue key = dev.device.split("/")[-1] if key not in devices: if key in counters: devices[key] = dict(dev._asdict(), **counters[key]._asdict()) else: devices[key] = dev._asdict() usage[key] = psutil.disk_usage(dev.mountpoint)._asdict() hdata["devices"] = devices hdata["usage"] = usage hdata["elapsed"] = perf_counter_ns() - start self._last_value = hdata return hdata