Source code for crossbar.common.fswatcher

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import os

from txaio import make_logger

try:
    from watchdog.events import FileSystemEventHandler
    from watchdog.observers import Observer
except ImportError:
[docs] HAS_FS_WATCHER = False
else: HAS_FS_WATCHER = True __all__ = ("FilesystemWatcher", "HAS_FS_WATCHER") if HAS_FS_WATCHER:
[docs] class FilesystemWatcher: """ Watches a directories for file system changes. """
[docs] log = make_logger()
def __init__(self, working_dir=".", watched_dirs=["."]): """ :param watched_dirs: Directories to watch for changes. :type watched_dirs: list of str """
[docs] self._working_dir = working_dir
[docs] self._watched_dirs = watched_dirs
[docs] self._started = False
[docs] self._observer = Observer()
[docs] self._handler = FileSystemEventHandler()
for path in watched_dirs: path = os.path.abspath(os.path.join(working_dir, path)) self._observer.schedule(self._handler, path, recursive=True)
[docs] def start(self, callback): """ Start watching. """ if not self._started: def on_any_event(evt): event = { "type": evt.event_type, "abs_path": os.path.abspath(evt.src_path), "rel_path": os.path.relpath(evt.src_path, self._working_dir), "is_directory": evt.is_directory, } from twisted.internet import reactor reactor.callFromThread(callback, event) self._handler.on_any_event = on_any_event self._observer.start()
[docs] def stop(self): """ Stop watching. """ if self._started: self._observer.stop() self._observer.join() self._started = False
[docs] def is_started(self): """ Check if the watcher is running. """ return self._started