Cours/venv/lib/python3.12/site-packages/watchdog/observers/fsevents.py

340 lines
14 KiB
Python

""":module: watchdog.observers.fsevents
:synopsis: FSEvents based emitter implementation.
:author: yesudeep@google.com (Yesudeep Mangalapilly)
:author: contact@tiger-222.fr (Mickaël Schoentgen)
:platforms: macOS
"""
from __future__ import annotations
import logging
import os
import threading
import time
import unicodedata
from typing import TYPE_CHECKING
import _watchdog_fsevents as _fsevents
from watchdog.events import (
DirCreatedEvent,
DirDeletedEvent,
DirModifiedEvent,
DirMovedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileMovedEvent,
generate_sub_created_events,
generate_sub_moved_events,
)
from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter
from watchdog.utils.dirsnapshot import DirectorySnapshot
if TYPE_CHECKING:
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers.api import EventQueue, ObservedWatch
logger = logging.getLogger("fsevents")
class FSEventsEmitter(EventEmitter):
"""macOS FSEvents Emitter class.
:param event_queue:
The event queue to fill with events.
:param watch:
A watch object representing the directory to monitor.
:type watch:
:class:`watchdog.observers.api.ObservedWatch`
:param timeout:
Read events blocking timeout (in seconds).
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:param suppress_history:
The FSEvents API may emit historic events up to 30 sec before the watch was
started. When ``suppress_history`` is ``True``, those events will be suppressed
by creating a directory snapshot of the watched path before starting the stream
as a reference to suppress old events. Warning: This may result in significant
memory usage in case of a large number of items in the watched path.
:type timeout:
``float``
"""
def __init__(
self,
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
suppress_history: bool = False,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
self._fs_view: set[int] = set()
self.suppress_history = suppress_history
self._start_time = 0.0
self._starting_state: DirectorySnapshot | None = None
self._lock = threading.Lock()
self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path)))
def on_thread_stop(self) -> None:
_fsevents.remove_watch(self.watch)
_fsevents.stop(self)
def queue_event(self, event: FileSystemEvent) -> None:
# fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop
# all the events here which do not have a src_path / dest_path that matches the watched path
if self._watch.is_recursive or not self._is_recursive_event(event):
logger.debug("queue_event %s", event)
EventEmitter.queue_event(self, event)
else:
logger.debug("drop event %s", event)
def _is_recursive_event(self, event: FileSystemEvent) -> bool:
src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path)
if src_path == self._absolute_watch_path:
return False
if isinstance(event, (FileMovedEvent, DirMovedEvent)):
# when moving something into the watch path we must always take the dirname,
# otherwise we miss out on `DirMovedEvent`s
dest_path = os.path.dirname(event.dest_path)
if dest_path == self._absolute_watch_path:
return False
return True
def _queue_created_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(dirname))
def _queue_deleted_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(dirname))
def _queue_modified_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(src_path))
def _queue_renamed_event(
self,
src_event: FileSystemEvent,
src_path: bytes | str,
dst_path: bytes | str,
src_dirname: bytes | str,
dst_dirname: bytes | str,
) -> None:
cls = DirMovedEvent if src_event.is_directory else FileMovedEvent
dst_path = self._encode_path(dst_path)
self.queue_event(cls(src_path, dst_path))
self.queue_event(DirModifiedEvent(src_dirname))
self.queue_event(DirModifiedEvent(dst_dirname))
def _is_historic_created_event(self, event: _fsevents.NativeEvent) -> bool:
# We only queue a created event if the item was created after we
# started the FSEventsStream.
in_history = event.inode in self._fs_view
if self._starting_state:
try:
old_inode = self._starting_state.inode(event.path)[0]
before_start = old_inode == event.inode
except KeyError:
before_start = False
else:
before_start = False
return in_history or before_start
@staticmethod
def _is_meta_mod(event: _fsevents.NativeEvent) -> bool:
"""Returns True if the event indicates a change in metadata."""
return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change
def queue_events(self, timeout: float, events: list[_fsevents.NativeEvent]) -> None: # type: ignore[override]
if logger.getEffectiveLevel() <= logging.DEBUG:
for event in events:
flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True)
logger.debug("%s: %s", event, flags)
if time.monotonic() - self._start_time > 60:
# Event history is no longer needed, let's free some memory.
self._starting_state = None
while events:
event = events.pop(0)
src_path = self._encode_path(event.path)
src_dirname = os.path.dirname(src_path)
try:
stat = os.stat(src_path)
except OSError:
stat = None
exists = stat and stat.st_ino == event.inode
# FSevents may coalesce multiple events for the same item + path into a
# single event. However, events are never coalesced for different items at
# the same path or for the same item at different paths. Therefore, the
# event chains "removed -> created" and "created -> renamed -> removed" will
# never emit a single native event and a deleted event *always* means that
# the item no longer existed at the end of the event chain.
# Some events will have a spurious `is_created` flag set, coalesced from an
# already emitted and processed CreatedEvent. To filter those, we keep track
# of all inodes which we know to be already created. This is safer than
# keeping track of paths since paths are more likely to be reused than
# inodes.
# Likewise, some events will have a spurious `is_modified`,
# `is_inode_meta_mod` or `is_xattr_mod` flag set. We currently do not
# suppress those but could do so if the item still exists by caching the
# stat result and verifying that it did change.
if event.is_created and event.is_removed:
# Events will only be coalesced for the same item / inode.
# The sequence deleted -> created therefore cannot occur.
# Any combination with renamed cannot occur either.
if not self._is_historic_created_event(event):
self._queue_created_event(event, src_path, src_dirname)
self._fs_view.add(event.inode)
if event.is_modified or self._is_meta_mod(event):
self._queue_modified_event(event, src_path, src_dirname)
self._queue_deleted_event(event, src_path, src_dirname)
self._fs_view.discard(event.inode)
else:
if event.is_created and not self._is_historic_created_event(event):
self._queue_created_event(event, src_path, src_dirname)
self._fs_view.add(event.inode)
if event.is_modified or self._is_meta_mod(event):
self._queue_modified_event(event, src_path, src_dirname)
if event.is_renamed:
# Check if we have a corresponding destination event in the watched path.
dst_event = next(
iter(e for e in events if e.is_renamed and e.inode == event.inode),
None,
)
if dst_event:
# Item was moved within the watched folder.
logger.debug("Destination event for rename is %s", dst_event)
dst_path = self._encode_path(dst_event.path)
dst_dirname = os.path.dirname(dst_path)
self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname)
self._fs_view.add(event.inode)
for sub_moved_event in generate_sub_moved_events(src_path, dst_path):
self.queue_event(sub_moved_event)
# Process any coalesced flags for the dst_event.
events.remove(dst_event)
if dst_event.is_modified or self._is_meta_mod(dst_event):
self._queue_modified_event(dst_event, dst_path, dst_dirname)
if dst_event.is_removed:
self._queue_deleted_event(dst_event, dst_path, dst_dirname)
self._fs_view.discard(dst_event.inode)
elif exists:
# This is the destination event, item was moved into the watched
# folder.
self._queue_created_event(event, src_path, src_dirname)
self._fs_view.add(event.inode)
for sub_created_event in generate_sub_created_events(src_path):
self.queue_event(sub_created_event)
else:
# This is the source event, item was moved out of the watched
# folder.
self._queue_deleted_event(event, src_path, src_dirname)
self._fs_view.discard(event.inode)
# Skip further coalesced processing.
continue
if event.is_removed:
# Won't occur together with renamed.
self._queue_deleted_event(event, src_path, src_dirname)
self._fs_view.discard(event.inode)
if event.is_root_changed:
# This will be set if root or any of its parents is renamed or deleted.
# TODO: find out new path and generate DirMovedEvent?
self.queue_event(DirDeletedEvent(self.watch.path))
logger.debug("Stopping because root path was changed")
self.stop()
self._fs_view.clear()
def events_callback(self, paths: list[bytes], inodes: list[int], flags: list[int], ids: list[int]) -> None:
"""Callback passed to FSEventStreamCreate(), it will receive all
FS events and queue them.
"""
cls = _fsevents.NativeEvent
try:
events = [
cls(path, inode, event_flags, event_id)
for path, inode, event_flags, event_id in zip(paths, inodes, flags, ids)
]
with self._lock:
self.queue_events(self.timeout, events)
except Exception:
logger.exception("Unhandled exception in fsevents callback")
def run(self) -> None:
self.pathnames = [self.watch.path]
self._start_time = time.monotonic()
try:
_fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames)
_fsevents.read_events(self)
except Exception:
logger.exception("Unhandled exception in FSEventsEmitter")
def on_thread_start(self) -> None:
if self.suppress_history:
watch_path = os.fsdecode(self.watch.path) if isinstance(self.watch.path, bytes) else self.watch.path
self._starting_state = DirectorySnapshot(watch_path)
def _encode_path(self, path: bytes | str) -> bytes | str:
"""Encode path only if bytes were passed to this emitter."""
return os.fsencode(path) if isinstance(self.watch.path, bytes) else path
class FSEventsObserver(BaseObserver):
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(FSEventsEmitter, timeout=timeout)
def schedule(
self,
event_handler: FileSystemEventHandler,
path: str,
*,
recursive: bool = False,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> ObservedWatch:
# Fix for issue #26: Trace/BPT error when given a unicode path
# string. https://github.com/gorakhargosh/watchdog/issues#issue/26
if isinstance(path, str):
path = unicodedata.normalize("NFC", path)
return super().schedule(event_handler, path, recursive=recursive, event_filter=event_filter)