Source code for crossbar.edge.worker.monitor._network
##############################################################################
#
# Crossbar.io
# Copyright (C) typedef int GmbH. All rights reserved.
#
##############################################################################
import psutil
from txaio import perf_counter_ns
from crossbar.edge.worker.monitor._base import Monitor
__all__ = ("NetMonitor",)
[docs]
class NetMonitor(Monitor):
"""
Network monitoring via psutils.
"""
[docs]
def poll(self):
"""
Measure current stats value and return new stats.
"""
hdata = Monitor.poll(self)
start = perf_counter_ns()
counters = {}
io_counters = psutil.net_io_counters(True)
for dev in io_counters:
if dev.startswith("virbr") or dev.startswith("lo") or dev.startswith("docker"):
continue
counters[dev] = io_counters[dev]._asdict()
hdata["net_io_counters"] = counters
hdata["net_if_addrs"] = psutil.net_if_addrs()
hdata["elapsed"] = perf_counter_ns() - start
self._last_value = hdata
return hdata