Source code for crossbar.common.reloader
#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
import os
import sys
from txaio import make_logger
try:
reload
except NameError:
# Python 3
from importlib import reload
__all__ = ("TrackingModuleReloader",)
def get_module_path_and_mtime(m):
"""
Given a Python module, returns a pair (source file, source file mtime).
:param m: A Python module instance.
:type m: obj
"""
res = (None, None)
if m and getattr(m, "__file__", None) and (m.__file__.endswith(".py") or m.__file__.endswith(".pyc")):
f = m.__file__
if f.endswith(".pyc"):
f = f[:-1]
try:
mtime = os.stat(f)[8]
except:
res = (f, None)
else:
res = (f, mtime)
return res
[docs]
class TrackingModuleReloader:
"""
A tracking module reloader.
This will track modules loaded _after_ a snapshot (point in time), and
later allow to force reload of exactly those modules that have been (first)
loaded after that point in time.
"""
def __init__(self, use_mtimes=True, snapshot=True):
"""
:param use_mtimes: If `True`, try to use file modification times to limit
module reloading to actually changed files.
:type use_mtimes: bool
"""
[docs]
self._use_mtimes = use_mtimes
if snapshot:
self.snapshot()
[docs]
def snapshot(self):
"""
Establish a snapshot - that is, remember which modules are currently
loaded. Later, when reload() is called, only modules imported later
will be (forcefully) reloaded.
"""
self._modules = sys.modules.copy()
# do mtime tracking ..
if self._use_mtimes:
self._module_mtimes = {}
for mod_name, mod in self._modules.items():
self._module_mtimes[mod_name] = get_module_path_and_mtime(mod)
[docs]
def reload(self):
"""
Trigger reloading of all modules imported later than the last snapshot
established.
:returns int -- Number of modules reloaded.
"""
current_modules = sys.modules
maybe_dirty_modules = set(current_modules.keys()) - set(self._modules.keys())
reload_modules = []
# use tracked mtimes to restrict set of actually reloaded modules, while
# trying to be conservative (if stuff fails or cannot be determined, opt
# for reloading the module -- even if it actually hasn't changed)
#
if self._use_mtimes:
for mod_name in maybe_dirty_modules:
m = current_modules[mod_name]
if mod_name in self._module_mtimes:
f, new_mtime = get_module_path_and_mtime(m)
_, old_mtime = self._module_mtimes[mod_name]
if new_mtime == old_mtime:
self.log.debug("Module {mod_name} unchanged", mod_name=mod_name)
else:
self._module_mtimes[mod_name] = (f, new_mtime)
reload_modules.append(mod_name)
self.log.debug("Change of module {mod_name} detected (file {f}).", mod_name=mod_name, f=f)
else:
self._module_mtimes[mod_name] = get_module_path_and_mtime(m)
reload_modules.append(mod_name)
self.log.debug("Tracking new module {mod_name}", mod_name=mod_name)
else:
reload_modules = maybe_dirty_modules
if len(reload_modules):
self.log.debug("Reloading {modules} possibly changed modules", modules=len(reload_modules))
for module in reload_modules:
self.log.debug("Reloading module {module}", module=module)
# this is doing the actual work
#
reload(current_modules[module])
else:
self.log.debug("No modules to reload")
return reload_modules