from __future__ import annotations import contextlib import queue import threading from collections import defaultdict from pathlib import Path from typing import TYPE_CHECKING from watchdog.utils import BaseThread from watchdog.utils.bricks import SkipRepeatsQueue if TYPE_CHECKING: from watchdog.events import FileSystemEvent, FileSystemEventHandler DEFAULT_EMITTER_TIMEOUT = 1.0 # in seconds DEFAULT_OBSERVER_TIMEOUT = 1.0 # in seconds class EventQueue(SkipRepeatsQueue): """Thread-safe event queue based on a special queue that skips adding the same event (:class:`FileSystemEvent`) multiple times consecutively. Thus avoiding dispatching multiple event handling calls when multiple identical events are produced quicker than an observer can consume them. """ class ObservedWatch: """An scheduled watch. :param path: Path string. :param recursive: ``True`` if watch is recursive; ``False`` otherwise. :param event_filter: Optional collection of :class:`watchdog.events.FileSystemEvent` to watch """ def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[type[FileSystemEvent]] | None = None): self._path = str(path) if isinstance(path, Path) else path self._is_recursive = recursive self._event_filter = frozenset(event_filter) if event_filter is not None else None @property def path(self) -> str: """The path that this watch monitors.""" return self._path @property def is_recursive(self) -> bool: """Determines whether subdirectories are watched for the path.""" return self._is_recursive @property def event_filter(self) -> frozenset[type[FileSystemEvent]] | None: """Collection of event types watched for the path""" return self._event_filter @property def key(self) -> tuple[str, bool, frozenset[type[FileSystemEvent]] | None]: return self.path, self.is_recursive, self.event_filter def __eq__(self, watch: object) -> bool: if not isinstance(watch, ObservedWatch): return NotImplemented return self.key == watch.key def __ne__(self, watch: object) -> bool: if not isinstance(watch, ObservedWatch): return NotImplemented return self.key != watch.key def __hash__(self) -> int: return hash(self.key) def __repr__(self) -> str: if self.event_filter is not None: event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter)) event_filter_str = f", event_filter={event_filter_str}" else: event_filter_str = "" return f"<{type(self).__name__}: path={self.path!r}, is_recursive={self.is_recursive}{event_filter_str}>" # Observer classes class EventEmitter(BaseThread): """Producer thread base class subclassed by event emitters that generate events and populate a queue with them. :param event_queue: The event queue to populate with generated events. :type event_queue: :class:`watchdog.events.EventQueue` :param watch: The watch to observe and produce events for. :type watch: :class:`ObservedWatch` :param timeout: Timeout (in seconds) between successive attempts at reading events. :type timeout: ``float`` :param event_filter: Collection of event types to emit, or None for no filtering (default). :type event_filter: Iterable[:class:`watchdog.events.FileSystemEvent`] | None """ def __init__( self, event_queue: EventQueue, watch: ObservedWatch, *, timeout: float = DEFAULT_EMITTER_TIMEOUT, event_filter: list[type[FileSystemEvent]] | None = None, ) -> None: super().__init__() self._event_queue = event_queue self._watch = watch self._timeout = timeout self._event_filter = frozenset(event_filter) if event_filter is not None else None @property def timeout(self) -> float: """Blocking timeout for reading events.""" return self._timeout @property def watch(self) -> ObservedWatch: """The watch associated with this emitter.""" return self._watch def queue_event(self, event: FileSystemEvent) -> None: """Queues a single event. :param event: Event to be queued. :type event: An instance of :class:`watchdog.events.FileSystemEvent` or a subclass. """ if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter): self._event_queue.put((event, self.watch)) def queue_events(self, timeout: float) -> None: """Override this method to populate the event queue with events per interval period. :param timeout: Timeout (in seconds) between successive attempts at reading events. :type timeout: ``float`` """ def run(self) -> None: while self.should_keep_running(): self.queue_events(self.timeout) class EventDispatcher(BaseThread): """Consumer thread base class subclassed by event observer threads that dispatch events from an event queue to appropriate event handlers. :param timeout: Timeout value (in seconds) passed to emitters constructions in the child class BaseObserver. :type timeout: ``float`` """ stop_event = object() """Event inserted into the queue to signal a requested stop.""" def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__() self._event_queue = EventQueue() self._timeout = timeout @property def timeout(self) -> float: """Timeout value to construct emitters with.""" return self._timeout def stop(self) -> None: BaseThread.stop(self) with contextlib.suppress(queue.Full): self.event_queue.put_nowait(EventDispatcher.stop_event) @property def event_queue(self) -> EventQueue: """The event queue which is populated with file system events by emitters and from which events are dispatched by a dispatcher thread. """ return self._event_queue def dispatch_events(self, event_queue: EventQueue) -> None: """Override this method to consume events from an event queue, blocking on the queue for the specified timeout before raising :class:`queue.Empty`. :param event_queue: Event queue to populate with one set of events. :type event_queue: :class:`EventQueue` :raises: :class:`queue.Empty` """ def run(self) -> None: while self.should_keep_running(): try: self.dispatch_events(self.event_queue) except queue.Empty: continue class BaseObserver(EventDispatcher): """Base observer.""" def __init__(self, emitter_class: type[EventEmitter], *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__(timeout=timeout) self._emitter_class = emitter_class self._lock = threading.RLock() self._watches: set[ObservedWatch] = set() self._handlers: defaultdict[ObservedWatch, set[FileSystemEventHandler]] = defaultdict(set) self._emitters: set[EventEmitter] = set() self._emitter_for_watch: dict[ObservedWatch, EventEmitter] = {} def _add_emitter(self, emitter: EventEmitter) -> None: self._emitter_for_watch[emitter.watch] = emitter self._emitters.add(emitter) def _remove_emitter(self, emitter: EventEmitter) -> None: del self._emitter_for_watch[emitter.watch] self._emitters.remove(emitter) emitter.stop() with contextlib.suppress(RuntimeError): emitter.join() def _clear_emitters(self) -> None: for emitter in self._emitters: emitter.stop() for emitter in self._emitters: with contextlib.suppress(RuntimeError): emitter.join() self._emitters.clear() self._emitter_for_watch.clear() def _add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: self._handlers[watch].add(event_handler) def _remove_handlers_for_watch(self, watch: ObservedWatch) -> None: del self._handlers[watch] @property def emitters(self) -> set[EventEmitter]: """Returns event emitter created by this observer.""" return self._emitters def start(self) -> None: for emitter in self._emitters.copy(): try: emitter.start() except Exception: self._remove_emitter(emitter) raise super().start() def schedule( self, event_handler: FileSystemEventHandler, path: str, *, recursive: bool = False, event_filter: list[type[FileSystemEvent]] | None = None, ) -> ObservedWatch: """Schedules watching a path and calls appropriate methods specified in the given event handler in response to file system events. :param event_handler: An event handler instance that has appropriate event handling methods which will be called by the observer in response to file system events. :type event_handler: :class:`watchdog.events.FileSystemEventHandler` or a subclass :param path: Directory path that will be monitored. :type path: ``str`` :param recursive: ``True`` if events will be emitted for sub-directories traversed recursively; ``False`` otherwise. :type recursive: ``bool`` :param event_filter: Collection of event types to emit, or None for no filtering (default). :type event_filter: Iterable[:class:`watchdog.events.FileSystemEvent`] | None :return: An :class:`ObservedWatch` object instance representing a watch. """ with self._lock: watch = ObservedWatch(path, recursive=recursive, event_filter=event_filter) self._add_handler_for_watch(event_handler, watch) # If we don't have an emitter for this watch already, create it. if watch not in self._emitter_for_watch: emitter = self._emitter_class(self.event_queue, watch, timeout=self.timeout, event_filter=event_filter) if self.is_alive(): emitter.start() self._add_emitter(emitter) self._watches.add(watch) return watch def add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: """Adds a handler for the given watch. :param event_handler: An event handler instance that has appropriate event handling methods which will be called by the observer in response to file system events. :type event_handler: :class:`watchdog.events.FileSystemEventHandler` or a subclass :param watch: The watch to add a handler for. :type watch: An instance of :class:`ObservedWatch` or a subclass of :class:`ObservedWatch` """ with self._lock: self._add_handler_for_watch(event_handler, watch) def remove_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: """Removes a handler for the given watch. :param event_handler: An event handler instance that has appropriate event handling methods which will be called by the observer in response to file system events. :type event_handler: :class:`watchdog.events.FileSystemEventHandler` or a subclass :param watch: The watch to remove a handler for. :type watch: An instance of :class:`ObservedWatch` or a subclass of :class:`ObservedWatch` """ with self._lock: self._handlers[watch].remove(event_handler) def unschedule(self, watch: ObservedWatch) -> None: """Unschedules a watch. :param watch: The watch to unschedule. :type watch: An instance of :class:`ObservedWatch` or a subclass of :class:`ObservedWatch` """ with self._lock: emitter = self._emitter_for_watch[watch] del self._handlers[watch] self._remove_emitter(emitter) self._watches.remove(watch) def unschedule_all(self) -> None: """Unschedules all watches and detaches all associated event handlers.""" with self._lock: self._handlers.clear() self._clear_emitters() self._watches.clear() def on_thread_stop(self) -> None: self.unschedule_all() def dispatch_events(self, event_queue: EventQueue) -> None: entry = event_queue.get(block=True) if entry is EventDispatcher.stop_event: return event, watch = entry with self._lock: # To allow unschedule/stop and safe removal of event handlers # within event handlers itself, check if the handler is still # registered after every dispatch. for handler in self._handlers[watch].copy(): if handler in self._handlers[watch]: handler.dispatch(event) event_queue.task_done()