from __future__ import annotations import contextlib import ctypes import ctypes.util import errno import os import struct import threading from ctypes import c_char_p, c_int, c_uint32 from functools import reduce from typing import TYPE_CHECKING from watchdog.utils import UnsupportedLibcError if TYPE_CHECKING: from collections.abc import Generator libc = ctypes.CDLL(None) if not hasattr(libc, "inotify_init") or not hasattr(libc, "inotify_add_watch") or not hasattr(libc, "inotify_rm_watch"): error = f"Unsupported libc version found: {libc._name}" # noqa:SLF001 raise UnsupportedLibcError(error) inotify_add_watch = ctypes.CFUNCTYPE(c_int, c_int, c_char_p, c_uint32, use_errno=True)(("inotify_add_watch", libc)) inotify_rm_watch = ctypes.CFUNCTYPE(c_int, c_int, c_uint32, use_errno=True)(("inotify_rm_watch", libc)) inotify_init = ctypes.CFUNCTYPE(c_int, use_errno=True)(("inotify_init", libc)) class InotifyConstants: # User-space events IN_ACCESS = 0x00000001 # File was accessed. IN_MODIFY = 0x00000002 # File was modified. IN_ATTRIB = 0x00000004 # Meta-data changed. IN_CLOSE_WRITE = 0x00000008 # Writable file was closed. IN_CLOSE_NOWRITE = 0x00000010 # Unwritable file closed. IN_OPEN = 0x00000020 # File was opened. IN_MOVED_FROM = 0x00000040 # File was moved from X. IN_MOVED_TO = 0x00000080 # File was moved to Y. IN_CREATE = 0x00000100 # Subfile was created. IN_DELETE = 0x00000200 # Subfile was deleted. IN_DELETE_SELF = 0x00000400 # Self was deleted. IN_MOVE_SELF = 0x00000800 # Self was moved. # Helper user-space events. IN_MOVE = IN_MOVED_FROM | IN_MOVED_TO # Moves. # Events sent by the kernel to a watch. IN_UNMOUNT = 0x00002000 # Backing file system was unmounted. IN_Q_OVERFLOW = 0x00004000 # Event queued overflowed. IN_IGNORED = 0x00008000 # File was ignored. # Special flags. IN_ONLYDIR = 0x01000000 # Only watch the path if it's a directory. IN_DONT_FOLLOW = 0x02000000 # Do not follow a symbolic link. IN_EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects IN_MASK_ADD = 0x20000000 # Add to the mask of an existing watch. IN_ISDIR = 0x40000000 # Event occurred against directory. IN_ONESHOT = 0x80000000 # Only send event once. # All user-space events. IN_ALL_EVENTS = reduce( lambda x, y: x | y, [ IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE, IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_DELETE, IN_CREATE, IN_DELETE_SELF, IN_MOVE_SELF, ], ) # Flags for ``inotify_init1`` IN_CLOEXEC = 0x02000000 IN_NONBLOCK = 0x00004000 # Watchdog's API cares only about these events. WATCHDOG_ALL_EVENTS = reduce( lambda x, y: x | y, [ InotifyConstants.IN_MODIFY, InotifyConstants.IN_ATTRIB, InotifyConstants.IN_MOVED_FROM, InotifyConstants.IN_MOVED_TO, InotifyConstants.IN_CREATE, InotifyConstants.IN_DELETE, InotifyConstants.IN_DELETE_SELF, InotifyConstants.IN_DONT_FOLLOW, InotifyConstants.IN_CLOSE_WRITE, InotifyConstants.IN_CLOSE_NOWRITE, InotifyConstants.IN_OPEN, ], ) class InotifyEventStruct(ctypes.Structure): """Structure representation of the inotify_event structure (used in buffer size calculations):: struct inotify_event { __s32 wd; /* watch descriptor */ __u32 mask; /* watch mask */ __u32 cookie; /* cookie to synchronize two events */ __u32 len; /* length (including nulls) of name */ char name[0]; /* stub for possible name */ }; """ _fields_ = ( ("wd", c_int), ("mask", c_uint32), ("cookie", c_uint32), ("len", c_uint32), ("name", c_char_p), ) EVENT_SIZE = ctypes.sizeof(InotifyEventStruct) DEFAULT_NUM_EVENTS = 2048 DEFAULT_EVENT_BUFFER_SIZE = DEFAULT_NUM_EVENTS * (EVENT_SIZE + 16) class Inotify: """Linux inotify(7) API wrapper class. :param path: The directory path for which we want an inotify object. :type path: :class:`bytes` :param recursive: ``True`` if subdirectories should be monitored; ``False`` otherwise. """ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | None = None) -> None: # The file descriptor associated with the inotify instance. inotify_fd = inotify_init() if inotify_fd == -1: Inotify._raise_error() self._inotify_fd = inotify_fd self._lock = threading.Lock() # Stores the watch descriptor for a given path. self._wd_for_path: dict[bytes, int] = {} self._path_for_wd: dict[int, bytes] = {} self._path = path # Default to all events if event_mask is None: event_mask = WATCHDOG_ALL_EVENTS self._event_mask = event_mask self._is_recursive = recursive if os.path.isdir(path): self._add_dir_watch(path, event_mask, recursive=recursive) else: self._add_watch(path, event_mask) self._moved_from_events: dict[int, InotifyEvent] = {} @property def event_mask(self) -> int: """The event mask for this inotify instance.""" return self._event_mask @property def path(self) -> bytes: """The path associated with the inotify instance.""" return self._path @property def is_recursive(self) -> bool: """Whether we are watching directories recursively.""" return self._is_recursive @property def fd(self) -> int: """The file descriptor associated with the inotify instance.""" return self._inotify_fd def clear_move_records(self) -> None: """Clear cached records of MOVED_FROM events""" self._moved_from_events = {} def source_for_move(self, destination_event: InotifyEvent) -> bytes | None: """The source path corresponding to the given MOVED_TO event. If the source path is outside the monitored directories, None is returned instead. """ if destination_event.cookie in self._moved_from_events: return self._moved_from_events[destination_event.cookie].src_path return None def remember_move_from_event(self, event: InotifyEvent) -> None: """Save this event as the source event for future MOVED_TO events to reference. """ self._moved_from_events[event.cookie] = event def add_watch(self, path: bytes) -> None: """Adds a watch for the given path. :param path: Path to begin monitoring. """ with self._lock: self._add_watch(path, self._event_mask) def remove_watch(self, path: bytes) -> None: """Removes a watch for the given path. :param path: Path string for which the watch will be removed. """ with self._lock: wd = self._wd_for_path.pop(path) del self._path_for_wd[wd] if inotify_rm_watch(self._inotify_fd, wd) == -1: Inotify._raise_error() def close(self) -> None: """Closes the inotify instance and removes all associated watches.""" with self._lock: if self._path in self._wd_for_path: wd = self._wd_for_path[self._path] inotify_rm_watch(self._inotify_fd, wd) # descriptor may be invalid because file was deleted with contextlib.suppress(OSError): os.close(self._inotify_fd) def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]: """Reads events from inotify and yields them.""" # HACK: We need to traverse the directory path # recursively and simulate events for newly # created subdirectories/files. This will handle # mkdir -p foobar/blah/bar; touch foobar/afile def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]: events = [] for root, dirnames, filenames in os.walk(src_path): for dirname in dirnames: with contextlib.suppress(OSError): full_path = os.path.join(root, dirname) wd_dir = self._add_watch(full_path, self._event_mask) e = InotifyEvent( wd_dir, InotifyConstants.IN_CREATE | InotifyConstants.IN_ISDIR, 0, dirname, full_path, ) events.append(e) for filename in filenames: full_path = os.path.join(root, filename) wd_parent_dir = self._wd_for_path[os.path.dirname(full_path)] e = InotifyEvent( wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path, ) events.append(e) return events event_buffer = None while True: try: event_buffer = os.read(self._inotify_fd, event_buffer_size) except OSError as e: if e.errno == errno.EINTR: continue if e.errno == errno.EBADF: return [] raise break with self._lock: event_list = [] for wd, mask, cookie, name in Inotify._parse_event_buffer(event_buffer): if wd == -1: continue wd_path = self._path_for_wd[wd] src_path = os.path.join(wd_path, name) if name else wd_path # avoid trailing slash inotify_event = InotifyEvent(wd, mask, cookie, name, src_path) if inotify_event.is_moved_from: self.remember_move_from_event(inotify_event) elif inotify_event.is_moved_to: move_src_path = self.source_for_move(inotify_event) if move_src_path in self._wd_for_path: moved_wd = self._wd_for_path[move_src_path] del self._wd_for_path[move_src_path] self._wd_for_path[inotify_event.src_path] = moved_wd self._path_for_wd[moved_wd] = inotify_event.src_path if self.is_recursive: for _path in self._wd_for_path.copy(): if _path.startswith(move_src_path + os.path.sep.encode()): moved_wd = self._wd_for_path.pop(_path) _move_to_path = _path.replace(move_src_path, inotify_event.src_path) self._wd_for_path[_move_to_path] = moved_wd self._path_for_wd[moved_wd] = _move_to_path src_path = os.path.join(wd_path, name) inotify_event = InotifyEvent(wd, mask, cookie, name, src_path) if inotify_event.is_ignored: # Clean up book-keeping for deleted watches. path = self._path_for_wd.pop(wd) if self._wd_for_path[path] == wd: del self._wd_for_path[path] event_list.append(inotify_event) if self.is_recursive and inotify_event.is_directory and inotify_event.is_create: # TODO: When a directory from another part of the # filesystem is moved into a watched directory, this # will not generate events for the directory tree. # We need to coalesce IN_MOVED_TO events and those # IN_MOVED_TO events which don't pair up with # IN_MOVED_FROM events should be marked IN_CREATE # instead relative to this directory. try: self._add_watch(src_path, self._event_mask) except OSError: continue event_list.extend(_recursive_simulate(src_path)) return event_list # Non-synchronized methods. def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None: """Adds a watch (optionally recursively) for the given directory path to monitor events specified by the mask. :param path: Path to monitor :param recursive: ``True`` to monitor recursively. :param mask: Event bit mask. """ if not os.path.isdir(path): raise OSError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), path) self._add_watch(path, mask) if recursive: for root, dirnames, _ in os.walk(path): for dirname in dirnames: full_path = os.path.join(root, dirname) if os.path.islink(full_path): continue self._add_watch(full_path, mask) def _add_watch(self, path: bytes, mask: int) -> int: """Adds a watch for the given path to monitor events specified by the mask. :param path: Path to monitor :param mask: Event bit mask. """ wd = inotify_add_watch(self._inotify_fd, path, mask) if wd == -1: Inotify._raise_error() self._wd_for_path[path] = wd self._path_for_wd[wd] = path return wd @staticmethod def _raise_error() -> None: """Raises errors for inotify failures.""" err = ctypes.get_errno() if err == errno.ENOSPC: raise OSError(errno.ENOSPC, "inotify watch limit reached") if err == errno.EMFILE: raise OSError(errno.EMFILE, "inotify instance limit reached") if err != errno.EACCES: raise OSError(err, os.strerror(err)) @staticmethod def _parse_event_buffer(event_buffer: bytes) -> Generator[tuple[int, int, int, bytes]]: """Parses an event buffer of ``inotify_event`` structs returned by inotify:: struct inotify_event { __s32 wd; /* watch descriptor */ __u32 mask; /* watch mask */ __u32 cookie; /* cookie to synchronize two events */ __u32 len; /* length (including nulls) of name */ char name[0]; /* stub for possible name */ }; The ``cookie`` member of this struct is used to pair two related events, for example, it pairs an IN_MOVED_FROM event with an IN_MOVED_TO event. """ i = 0 while i + 16 <= len(event_buffer): wd, mask, cookie, length = struct.unpack_from("iIII", event_buffer, i) name = event_buffer[i + 16 : i + 16 + length].rstrip(b"\0") i += 16 + length yield wd, mask, cookie, name class InotifyEvent: """Inotify event struct wrapper. :param wd: Watch descriptor :param mask: Event mask :param cookie: Event cookie :param name: Base name of the event source path. :param src_path: Full event source path. """ def __init__(self, wd: int, mask: int, cookie: int, name: bytes, src_path: bytes) -> None: self._wd = wd self._mask = mask self._cookie = cookie self._name = name self._src_path = src_path @property def src_path(self) -> bytes: return self._src_path @property def wd(self) -> int: return self._wd @property def mask(self) -> int: return self._mask @property def cookie(self) -> int: return self._cookie @property def name(self) -> bytes: return self._name @property def is_modify(self) -> bool: return self._mask & InotifyConstants.IN_MODIFY > 0 @property def is_close_write(self) -> bool: return self._mask & InotifyConstants.IN_CLOSE_WRITE > 0 @property def is_close_nowrite(self) -> bool: return self._mask & InotifyConstants.IN_CLOSE_NOWRITE > 0 @property def is_open(self) -> bool: return self._mask & InotifyConstants.IN_OPEN > 0 @property def is_access(self) -> bool: return self._mask & InotifyConstants.IN_ACCESS > 0 @property def is_delete(self) -> bool: return self._mask & InotifyConstants.IN_DELETE > 0 @property def is_delete_self(self) -> bool: return self._mask & InotifyConstants.IN_DELETE_SELF > 0 @property def is_create(self) -> bool: return self._mask & InotifyConstants.IN_CREATE > 0 @property def is_moved_from(self) -> bool: return self._mask & InotifyConstants.IN_MOVED_FROM > 0 @property def is_moved_to(self) -> bool: return self._mask & InotifyConstants.IN_MOVED_TO > 0 @property def is_move(self) -> bool: return self._mask & InotifyConstants.IN_MOVE > 0 @property def is_move_self(self) -> bool: return self._mask & InotifyConstants.IN_MOVE_SELF > 0 @property def is_attrib(self) -> bool: return self._mask & InotifyConstants.IN_ATTRIB > 0 @property def is_ignored(self) -> bool: return self._mask & InotifyConstants.IN_IGNORED > 0 @property def is_directory(self) -> bool: # It looks like the kernel does not provide this information for # IN_DELETE_SELF and IN_MOVE_SELF. In this case, assume it's a dir. # See also: https://github.com/seb-m/pyinotify/blob/2c7e8f8/python2/pyinotify.py#L897 return self.is_delete_self or self.is_move_self or self._mask & InotifyConstants.IN_ISDIR > 0 @property def key(self) -> tuple[bytes, int, int, int, bytes]: return self._src_path, self._wd, self._mask, self._cookie, self._name def __eq__(self, inotify_event: object) -> bool: if not isinstance(inotify_event, InotifyEvent): return NotImplemented return self.key == inotify_event.key def __ne__(self, inotify_event: object) -> bool: if not isinstance(inotify_event, InotifyEvent): return NotImplemented return self.key != inotify_event.key def __hash__(self) -> int: return hash(self.key) @staticmethod def _get_mask_string(mask: int) -> str: masks = [] for c in dir(InotifyConstants): if c.startswith("IN_") and c not in {"IN_ALL_EVENTS", "IN_MOVE"}: c_val = getattr(InotifyConstants, c) if mask & c_val: masks.append(c) return "|".join(masks) def __repr__(self) -> str: return ( f"<{type(self).__name__}: src_path={self.src_path!r}, wd={self.wd}," f" mask={self._get_mask_string(self.mask)}, cookie={self.cookie}," f" name={os.fsdecode(self.name)!r}>" )