# Copyright 2015-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. """Tools to monitor driver events. .. versionadded:: 3.1 .. attention:: Starting in PyMongo 3.11, the monitoring classes outlined below are included in the PyMongo distribution under the :mod:`~pymongo.event_loggers` submodule. Use :func:`register` to register global listeners for specific events. Listeners must inherit from one of the abstract classes below and implement the correct functions for that class. For example, a simple command logger might be implemented like this:: import logging from pymongo import monitoring class CommandLogger(monitoring.CommandListener): def started(self, event): logging.info("Command {0.command_name} with request id " "{0.request_id} started on server " "{0.connection_id}".format(event)) def succeeded(self, event): logging.info("Command {0.command_name} with request id " "{0.request_id} on server {0.connection_id} " "succeeded in {0.duration_micros} " "microseconds".format(event)) def failed(self, event): logging.info("Command {0.command_name} with request id " "{0.request_id} on server {0.connection_id} " "failed in {0.duration_micros} " "microseconds".format(event)) monitoring.register(CommandLogger()) Server discovery and monitoring events are also available. For example:: class ServerLogger(monitoring.ServerListener): def opened(self, event): logging.info("Server {0.server_address} added to topology " "{0.topology_id}".format(event)) def description_changed(self, event): previous_server_type = event.previous_description.server_type new_server_type = event.new_description.server_type if new_server_type != previous_server_type: # server_type_name was added in PyMongo 3.4 logging.info( "Server {0.server_address} changed type from " "{0.previous_description.server_type_name} to " "{0.new_description.server_type_name}".format(event)) def closed(self, event): logging.warning("Server {0.server_address} removed from topology " "{0.topology_id}".format(event)) class HeartbeatLogger(monitoring.ServerHeartbeatListener): def started(self, event): logging.info("Heartbeat sent to server " "{0.connection_id}".format(event)) def succeeded(self, event): # The reply.document attribute was added in PyMongo 3.4. logging.info("Heartbeat to server {0.connection_id} " "succeeded with reply " "{0.reply.document}".format(event)) def failed(self, event): logging.warning("Heartbeat to server {0.connection_id} " "failed with error {0.reply}".format(event)) class TopologyLogger(monitoring.TopologyListener): def opened(self, event): logging.info("Topology with id {0.topology_id} " "opened".format(event)) def description_changed(self, event): logging.info("Topology description updated for " "topology id {0.topology_id}".format(event)) previous_topology_type = event.previous_description.topology_type new_topology_type = event.new_description.topology_type if new_topology_type != previous_topology_type: # topology_type_name was added in PyMongo 3.4 logging.info( "Topology {0.topology_id} changed type from " "{0.previous_description.topology_type_name} to " "{0.new_description.topology_type_name}".format(event)) # The has_writable_server and has_readable_server methods # were added in PyMongo 3.4. if not event.new_description.has_writable_server(): logging.warning("No writable servers available.") if not event.new_description.has_readable_server(): logging.warning("No readable servers available.") def closed(self, event): logging.info("Topology with id {0.topology_id} " "closed".format(event)) Connection monitoring and pooling events are also available. For example:: class ConnectionPoolLogger(ConnectionPoolListener): def pool_created(self, event): logging.info("[pool {0.address}] pool created".format(event)) def pool_ready(self, event): logging.info("[pool {0.address}] pool is ready".format(event)) def pool_cleared(self, event): logging.info("[pool {0.address}] pool cleared".format(event)) def pool_closed(self, event): logging.info("[pool {0.address}] pool closed".format(event)) def connection_created(self, event): logging.info("[pool {0.address}][connection #{0.connection_id}] " "connection created".format(event)) def connection_ready(self, event): logging.info("[pool {0.address}][connection #{0.connection_id}] " "connection setup succeeded".format(event)) def connection_closed(self, event): logging.info("[pool {0.address}][connection #{0.connection_id}] " "connection closed, reason: " "{0.reason}".format(event)) def connection_check_out_started(self, event): logging.info("[pool {0.address}] connection check out " "started".format(event)) def connection_check_out_failed(self, event): logging.info("[pool {0.address}] connection check out " "failed, reason: {0.reason}".format(event)) def connection_checked_out(self, event): logging.info("[pool {0.address}][connection #{0.connection_id}] " "connection checked out of pool".format(event)) def connection_checked_in(self, event): logging.info("[pool {0.address}][connection #{0.connection_id}] " "connection checked into pool".format(event)) Event listeners can also be registered per instance of :class:`~pymongo.mongo_client.MongoClient`:: client = MongoClient(event_listeners=[CommandLogger()]) Note that previously registered global listeners are automatically included when configuring per client event listeners. Registering a new global listener will not add that listener to existing client instances. .. note:: Events are delivered **synchronously**. Application threads block waiting for event handlers (e.g. :meth:`~CommandListener.started`) to return. Care must be taken to ensure that your event handlers are efficient enough to not adversely affect overall application performance. .. warning:: The command documents published through this API are *not* copies. If you intend to modify them in any way you must copy them in your event handler first. """ from __future__ import annotations import datetime from collections import abc, namedtuple from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence from bson.objectid import ObjectId from pymongo.hello import Hello, HelloCompat from pymongo.helpers import _SENSITIVE_COMMANDS, _handle_exception from pymongo.typings import _Address, _DocumentOut if TYPE_CHECKING: from datetime import timedelta from pymongo.server_description import ServerDescription from pymongo.topology_description import TopologyDescription _Listeners = namedtuple( "_Listeners", ( "command_listeners", "server_listeners", "server_heartbeat_listeners", "topology_listeners", "cmap_listeners", ), ) _LISTENERS = _Listeners([], [], [], [], []) class _EventListener: """Abstract base class for all event listeners.""" class CommandListener(_EventListener): """Abstract base class for command listeners. Handles `CommandStartedEvent`, `CommandSucceededEvent`, and `CommandFailedEvent`. """ def started(self, event: CommandStartedEvent) -> None: """Abstract method to handle a `CommandStartedEvent`. :param event: An instance of :class:`CommandStartedEvent`. """ raise NotImplementedError def succeeded(self, event: CommandSucceededEvent) -> None: """Abstract method to handle a `CommandSucceededEvent`. :param event: An instance of :class:`CommandSucceededEvent`. """ raise NotImplementedError def failed(self, event: CommandFailedEvent) -> None: """Abstract method to handle a `CommandFailedEvent`. :param event: An instance of :class:`CommandFailedEvent`. """ raise NotImplementedError class ConnectionPoolListener(_EventListener): """Abstract base class for connection pool listeners. Handles all of the connection pool events defined in the Connection Monitoring and Pooling Specification: :class:`PoolCreatedEvent`, :class:`PoolClearedEvent`, :class:`PoolClosedEvent`, :class:`ConnectionCreatedEvent`, :class:`ConnectionReadyEvent`, :class:`ConnectionClosedEvent`, :class:`ConnectionCheckOutStartedEvent`, :class:`ConnectionCheckOutFailedEvent`, :class:`ConnectionCheckedOutEvent`, and :class:`ConnectionCheckedInEvent`. .. versionadded:: 3.9 """ def pool_created(self, event: PoolCreatedEvent) -> None: """Abstract method to handle a :class:`PoolCreatedEvent`. Emitted when a connection Pool is created. :param event: An instance of :class:`PoolCreatedEvent`. """ raise NotImplementedError def pool_ready(self, event: PoolReadyEvent) -> None: """Abstract method to handle a :class:`PoolReadyEvent`. Emitted when a connection Pool is marked ready. :param event: An instance of :class:`PoolReadyEvent`. .. versionadded:: 4.0 """ raise NotImplementedError def pool_cleared(self, event: PoolClearedEvent) -> None: """Abstract method to handle a `PoolClearedEvent`. Emitted when a connection Pool is cleared. :param event: An instance of :class:`PoolClearedEvent`. """ raise NotImplementedError def pool_closed(self, event: PoolClosedEvent) -> None: """Abstract method to handle a `PoolClosedEvent`. Emitted when a connection Pool is closed. :param event: An instance of :class:`PoolClosedEvent`. """ raise NotImplementedError def connection_created(self, event: ConnectionCreatedEvent) -> None: """Abstract method to handle a :class:`ConnectionCreatedEvent`. Emitted when a connection Pool creates a Connection object. :param event: An instance of :class:`ConnectionCreatedEvent`. """ raise NotImplementedError def connection_ready(self, event: ConnectionReadyEvent) -> None: """Abstract method to handle a :class:`ConnectionReadyEvent`. Emitted when a connection has finished its setup, and is now ready to use. :param event: An instance of :class:`ConnectionReadyEvent`. """ raise NotImplementedError def connection_closed(self, event: ConnectionClosedEvent) -> None: """Abstract method to handle a :class:`ConnectionClosedEvent`. Emitted when a connection Pool closes a connection. :param event: An instance of :class:`ConnectionClosedEvent`. """ raise NotImplementedError def connection_check_out_started(self, event: ConnectionCheckOutStartedEvent) -> None: """Abstract method to handle a :class:`ConnectionCheckOutStartedEvent`. Emitted when the driver starts attempting to check out a connection. :param event: An instance of :class:`ConnectionCheckOutStartedEvent`. """ raise NotImplementedError def connection_check_out_failed(self, event: ConnectionCheckOutFailedEvent) -> None: """Abstract method to handle a :class:`ConnectionCheckOutFailedEvent`. Emitted when the driver's attempt to check out a connection fails. :param event: An instance of :class:`ConnectionCheckOutFailedEvent`. """ raise NotImplementedError def connection_checked_out(self, event: ConnectionCheckedOutEvent) -> None: """Abstract method to handle a :class:`ConnectionCheckedOutEvent`. Emitted when the driver successfully checks out a connection. :param event: An instance of :class:`ConnectionCheckedOutEvent`. """ raise NotImplementedError def connection_checked_in(self, event: ConnectionCheckedInEvent) -> None: """Abstract method to handle a :class:`ConnectionCheckedInEvent`. Emitted when the driver checks in a connection back to the connection Pool. :param event: An instance of :class:`ConnectionCheckedInEvent`. """ raise NotImplementedError class ServerHeartbeatListener(_EventListener): """Abstract base class for server heartbeat listeners. Handles `ServerHeartbeatStartedEvent`, `ServerHeartbeatSucceededEvent`, and `ServerHeartbeatFailedEvent`. .. versionadded:: 3.3 """ def started(self, event: ServerHeartbeatStartedEvent) -> None: """Abstract method to handle a `ServerHeartbeatStartedEvent`. :param event: An instance of :class:`ServerHeartbeatStartedEvent`. """ raise NotImplementedError def succeeded(self, event: ServerHeartbeatSucceededEvent) -> None: """Abstract method to handle a `ServerHeartbeatSucceededEvent`. :param event: An instance of :class:`ServerHeartbeatSucceededEvent`. """ raise NotImplementedError def failed(self, event: ServerHeartbeatFailedEvent) -> None: """Abstract method to handle a `ServerHeartbeatFailedEvent`. :param event: An instance of :class:`ServerHeartbeatFailedEvent`. """ raise NotImplementedError class TopologyListener(_EventListener): """Abstract base class for topology monitoring listeners. Handles `TopologyOpenedEvent`, `TopologyDescriptionChangedEvent`, and `TopologyClosedEvent`. .. versionadded:: 3.3 """ def opened(self, event: TopologyOpenedEvent) -> None: """Abstract method to handle a `TopologyOpenedEvent`. :param event: An instance of :class:`TopologyOpenedEvent`. """ raise NotImplementedError def description_changed(self, event: TopologyDescriptionChangedEvent) -> None: """Abstract method to handle a `TopologyDescriptionChangedEvent`. :param event: An instance of :class:`TopologyDescriptionChangedEvent`. """ raise NotImplementedError def closed(self, event: TopologyClosedEvent) -> None: """Abstract method to handle a `TopologyClosedEvent`. :param event: An instance of :class:`TopologyClosedEvent`. """ raise NotImplementedError class ServerListener(_EventListener): """Abstract base class for server listeners. Handles `ServerOpeningEvent`, `ServerDescriptionChangedEvent`, and `ServerClosedEvent`. .. versionadded:: 3.3 """ def opened(self, event: ServerOpeningEvent) -> None: """Abstract method to handle a `ServerOpeningEvent`. :param event: An instance of :class:`ServerOpeningEvent`. """ raise NotImplementedError def description_changed(self, event: ServerDescriptionChangedEvent) -> None: """Abstract method to handle a `ServerDescriptionChangedEvent`. :param event: An instance of :class:`ServerDescriptionChangedEvent`. """ raise NotImplementedError def closed(self, event: ServerClosedEvent) -> None: """Abstract method to handle a `ServerClosedEvent`. :param event: An instance of :class:`ServerClosedEvent`. """ raise NotImplementedError def _to_micros(dur: timedelta) -> int: """Convert duration 'dur' to microseconds.""" return int(dur.total_seconds() * 10e5) def _validate_event_listeners( option: str, listeners: Sequence[_EventListeners] ) -> Sequence[_EventListeners]: """Validate event listeners""" if not isinstance(listeners, abc.Sequence): raise TypeError(f"{option} must be a list or tuple") for listener in listeners: if not isinstance(listener, _EventListener): raise TypeError( f"Listeners for {option} must be either a " "CommandListener, ServerHeartbeatListener, " "ServerListener, TopologyListener, or " "ConnectionPoolListener." ) return listeners def register(listener: _EventListener) -> None: """Register a global event listener. :param listener: A subclasses of :class:`CommandListener`, :class:`ServerHeartbeatListener`, :class:`ServerListener`, :class:`TopologyListener`, or :class:`ConnectionPoolListener`. """ if not isinstance(listener, _EventListener): raise TypeError( f"Listeners for {listener} must be either a " "CommandListener, ServerHeartbeatListener, " "ServerListener, TopologyListener, or " "ConnectionPoolListener." ) if isinstance(listener, CommandListener): _LISTENERS.command_listeners.append(listener) if isinstance(listener, ServerHeartbeatListener): _LISTENERS.server_heartbeat_listeners.append(listener) if isinstance(listener, ServerListener): _LISTENERS.server_listeners.append(listener) if isinstance(listener, TopologyListener): _LISTENERS.topology_listeners.append(listener) if isinstance(listener, ConnectionPoolListener): _LISTENERS.cmap_listeners.append(listener) # The "hello" command is also deemed sensitive when attempting speculative # authentication. def _is_speculative_authenticate(command_name: str, doc: Mapping[str, Any]) -> bool: if ( command_name.lower() in ("hello", HelloCompat.LEGACY_CMD) and "speculativeAuthenticate" in doc ): return True return False class _CommandEvent: """Base class for command events.""" __slots__ = ( "__cmd_name", "__rqst_id", "__conn_id", "__op_id", "__service_id", "__db", "__server_conn_id", ) def __init__( self, command_name: str, request_id: int, connection_id: _Address, operation_id: Optional[int], service_id: Optional[ObjectId] = None, database_name: str = "", server_connection_id: Optional[int] = None, ) -> None: self.__cmd_name = command_name self.__rqst_id = request_id self.__conn_id = connection_id self.__op_id = operation_id self.__service_id = service_id self.__db = database_name self.__server_conn_id = server_connection_id @property def command_name(self) -> str: """The command name.""" return self.__cmd_name @property def request_id(self) -> int: """The request id for this operation.""" return self.__rqst_id @property def connection_id(self) -> _Address: """The address (host, port) of the server this command was sent to.""" return self.__conn_id @property def service_id(self) -> Optional[ObjectId]: """The service_id this command was sent to, or ``None``. .. versionadded:: 3.12 """ return self.__service_id @property def operation_id(self) -> Optional[int]: """An id for this series of events or None.""" return self.__op_id @property def database_name(self) -> str: """The database_name this command was sent to, or ``""``. .. versionadded:: 4.6 """ return self.__db @property def server_connection_id(self) -> Optional[int]: """The server-side connection id for the connection this command was sent on, or ``None``. .. versionadded:: 4.7 """ return self.__server_conn_id class CommandStartedEvent(_CommandEvent): """Event published when a command starts. :param command: The command document. :param database_name: The name of the database this command was run against. :param request_id: The request id for this operation. :param connection_id: The address (host, port) of the server this command was sent to. :param operation_id: An optional identifier for a series of related events. :param service_id: The service_id this command was sent to, or ``None``. """ __slots__ = ("__cmd",) def __init__( self, command: _DocumentOut, database_name: str, request_id: int, connection_id: _Address, operation_id: Optional[int], service_id: Optional[ObjectId] = None, server_connection_id: Optional[int] = None, ) -> None: if not command: raise ValueError(f"{command!r} is not a valid command") # Command name must be first key. command_name = next(iter(command)) super().__init__( command_name, request_id, connection_id, operation_id, service_id=service_id, database_name=database_name, server_connection_id=server_connection_id, ) cmd_name = command_name.lower() if cmd_name in _SENSITIVE_COMMANDS or _is_speculative_authenticate(cmd_name, command): self.__cmd: _DocumentOut = {} else: self.__cmd = command @property def command(self) -> _DocumentOut: """The command document.""" return self.__cmd @property def database_name(self) -> str: """The name of the database this command was run against.""" return super().database_name def __repr__(self) -> str: return ( "<{} {} db: {!r}, command: {!r}, operation_id: {}, service_id: {}, server_connection_id: {}>" ).format( self.__class__.__name__, self.connection_id, self.database_name, self.command_name, self.operation_id, self.service_id, self.server_connection_id, ) class CommandSucceededEvent(_CommandEvent): """Event published when a command succeeds. :param duration: The command duration as a datetime.timedelta. :param reply: The server reply document. :param command_name: The command name. :param request_id: The request id for this operation. :param connection_id: The address (host, port) of the server this command was sent to. :param operation_id: An optional identifier for a series of related events. :param service_id: The service_id this command was sent to, or ``None``. :param database_name: The database this command was sent to, or ``""``. """ __slots__ = ("__duration_micros", "__reply") def __init__( self, duration: datetime.timedelta, reply: _DocumentOut, command_name: str, request_id: int, connection_id: _Address, operation_id: Optional[int], service_id: Optional[ObjectId] = None, database_name: str = "", server_connection_id: Optional[int] = None, ) -> None: super().__init__( command_name, request_id, connection_id, operation_id, service_id=service_id, database_name=database_name, server_connection_id=server_connection_id, ) self.__duration_micros = _to_micros(duration) cmd_name = command_name.lower() if cmd_name in _SENSITIVE_COMMANDS or _is_speculative_authenticate(cmd_name, reply): self.__reply: _DocumentOut = {} else: self.__reply = reply @property def duration_micros(self) -> int: """The duration of this operation in microseconds.""" return self.__duration_micros @property def reply(self) -> _DocumentOut: """The server failure document for this operation.""" return self.__reply def __repr__(self) -> str: return ( "<{} {} db: {!r}, command: {!r}, operation_id: {}, duration_micros: {}, service_id: {}, server_connection_id: {}>" ).format( self.__class__.__name__, self.connection_id, self.database_name, self.command_name, self.operation_id, self.duration_micros, self.service_id, self.server_connection_id, ) class CommandFailedEvent(_CommandEvent): """Event published when a command fails. :param duration: The command duration as a datetime.timedelta. :param failure: The server reply document. :param command_name: The command name. :param request_id: The request id for this operation. :param connection_id: The address (host, port) of the server this command was sent to. :param operation_id: An optional identifier for a series of related events. :param service_id: The service_id this command was sent to, or ``None``. :param database_name: The database this command was sent to, or ``""``. """ __slots__ = ("__duration_micros", "__failure") def __init__( self, duration: datetime.timedelta, failure: _DocumentOut, command_name: str, request_id: int, connection_id: _Address, operation_id: Optional[int], service_id: Optional[ObjectId] = None, database_name: str = "", server_connection_id: Optional[int] = None, ) -> None: super().__init__( command_name, request_id, connection_id, operation_id, service_id=service_id, database_name=database_name, server_connection_id=server_connection_id, ) self.__duration_micros = _to_micros(duration) self.__failure = failure @property def duration_micros(self) -> int: """The duration of this operation in microseconds.""" return self.__duration_micros @property def failure(self) -> _DocumentOut: """The server failure document for this operation.""" return self.__failure def __repr__(self) -> str: return ( "<{} {} db: {!r}, command: {!r}, operation_id: {}, duration_micros: {}, " "failure: {!r}, service_id: {}, server_connection_id: {}>" ).format( self.__class__.__name__, self.connection_id, self.database_name, self.command_name, self.operation_id, self.duration_micros, self.failure, self.service_id, self.server_connection_id, ) class _PoolEvent: """Base class for pool events.""" __slots__ = ("__address",) def __init__(self, address: _Address) -> None: self.__address = address @property def address(self) -> _Address: """The address (host, port) pair of the server the pool is attempting to connect to. """ return self.__address def __repr__(self) -> str: return f"{self.__class__.__name__}({self.__address!r})" class PoolCreatedEvent(_PoolEvent): """Published when a Connection Pool is created. :param address: The address (host, port) pair of the server this Pool is attempting to connect to. .. versionadded:: 3.9 """ __slots__ = ("__options",) def __init__(self, address: _Address, options: dict[str, Any]) -> None: super().__init__(address) self.__options = options @property def options(self) -> dict[str, Any]: """Any non-default pool options that were set on this Connection Pool.""" return self.__options def __repr__(self) -> str: return f"{self.__class__.__name__}({self.address!r}, {self.__options!r})" class PoolReadyEvent(_PoolEvent): """Published when a Connection Pool is marked ready. :param address: The address (host, port) pair of the server this Pool is attempting to connect to. .. versionadded:: 4.0 """ __slots__ = () class PoolClearedEvent(_PoolEvent): """Published when a Connection Pool is cleared. :param address: The address (host, port) pair of the server this Pool is attempting to connect to. :param service_id: The service_id this command was sent to, or ``None``. :param interrupt_connections: True if all active connections were interrupted by the Pool during clearing. .. versionadded:: 3.9 """ __slots__ = ("__service_id", "__interrupt_connections") def __init__( self, address: _Address, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False, ) -> None: super().__init__(address) self.__service_id = service_id self.__interrupt_connections = interrupt_connections @property def service_id(self) -> Optional[ObjectId]: """Connections with this service_id are cleared. When service_id is ``None``, all connections in the pool are cleared. .. versionadded:: 3.12 """ return self.__service_id @property def interrupt_connections(self) -> bool: """If True, active connections are interrupted during clearing. .. versionadded:: 4.7 """ return self.__interrupt_connections def __repr__(self) -> str: return f"{self.__class__.__name__}({self.address!r}, {self.__service_id!r}, {self.__interrupt_connections!r})" class PoolClosedEvent(_PoolEvent): """Published when a Connection Pool is closed. :param address: The address (host, port) pair of the server this Pool is attempting to connect to. .. versionadded:: 3.9 """ __slots__ = () class ConnectionClosedReason: """An enum that defines values for `reason` on a :class:`ConnectionClosedEvent`. .. versionadded:: 3.9 """ STALE = "stale" """The pool was cleared, making the connection no longer valid.""" IDLE = "idle" """The connection became stale by being idle for too long (maxIdleTimeMS). """ ERROR = "error" """The connection experienced an error, making it no longer valid.""" POOL_CLOSED = "poolClosed" """The pool was closed, making the connection no longer valid.""" class ConnectionCheckOutFailedReason: """An enum that defines values for `reason` on a :class:`ConnectionCheckOutFailedEvent`. .. versionadded:: 3.9 """ TIMEOUT = "timeout" """The connection check out attempt exceeded the specified timeout.""" POOL_CLOSED = "poolClosed" """The pool was previously closed, and cannot provide new connections.""" CONN_ERROR = "connectionError" """The connection check out attempt experienced an error while setting up a new connection. """ class _ConnectionEvent: """Private base class for connection events.""" __slots__ = ("__address",) def __init__(self, address: _Address) -> None: self.__address = address @property def address(self) -> _Address: """The address (host, port) pair of the server this connection is attempting to connect to. """ return self.__address def __repr__(self) -> str: return f"{self.__class__.__name__}({self.__address!r})" class _ConnectionIdEvent(_ConnectionEvent): """Private base class for connection events with an id.""" __slots__ = ("__connection_id",) def __init__(self, address: _Address, connection_id: int) -> None: super().__init__(address) self.__connection_id = connection_id @property def connection_id(self) -> int: """The ID of the connection.""" return self.__connection_id def __repr__(self) -> str: return f"{self.__class__.__name__}({self.address!r}, {self.__connection_id!r})" class _ConnectionDurationEvent(_ConnectionIdEvent): """Private base class for connection events with a duration.""" __slots__ = ("__duration",) def __init__(self, address: _Address, connection_id: int, duration: Optional[float]) -> None: super().__init__(address, connection_id) self.__duration = duration @property def duration(self) -> Optional[float]: """The duration of the connection event. .. versionadded:: 4.7 """ return self.__duration def __repr__(self) -> str: return f"{self.__class__.__name__}({self.address!r}, {self.connection_id!r}, {self.__duration!r})" class ConnectionCreatedEvent(_ConnectionIdEvent): """Published when a Connection Pool creates a Connection object. NOTE: This connection is not ready for use until the :class:`ConnectionReadyEvent` is published. :param address: The address (host, port) pair of the server this Connection is attempting to connect to. :param connection_id: The integer ID of the Connection in this Pool. .. versionadded:: 3.9 """ __slots__ = () class ConnectionReadyEvent(_ConnectionDurationEvent): """Published when a Connection has finished its setup, and is ready to use. :param address: The address (host, port) pair of the server this Connection is attempting to connect to. :param connection_id: The integer ID of the Connection in this Pool. .. versionadded:: 3.9 """ __slots__ = () class ConnectionClosedEvent(_ConnectionIdEvent): """Published when a Connection is closed. :param address: The address (host, port) pair of the server this Connection is attempting to connect to. :param connection_id: The integer ID of the Connection in this Pool. :param reason: A reason explaining why this connection was closed. .. versionadded:: 3.9 """ __slots__ = ("__reason",) def __init__(self, address: _Address, connection_id: int, reason: str): super().__init__(address, connection_id) self.__reason = reason @property def reason(self) -> str: """A reason explaining why this connection was closed. The reason must be one of the strings from the :class:`ConnectionClosedReason` enum. """ return self.__reason def __repr__(self) -> str: return "{}({!r}, {!r}, {!r})".format( self.__class__.__name__, self.address, self.connection_id, self.__reason, ) class ConnectionCheckOutStartedEvent(_ConnectionEvent): """Published when the driver starts attempting to check out a connection. :param address: The address (host, port) pair of the server this Connection is attempting to connect to. .. versionadded:: 3.9 """ __slots__ = () class ConnectionCheckOutFailedEvent(_ConnectionDurationEvent): """Published when the driver's attempt to check out a connection fails. :param address: The address (host, port) pair of the server this Connection is attempting to connect to. :param reason: A reason explaining why connection check out failed. .. versionadded:: 3.9 """ __slots__ = ("__reason",) def __init__(self, address: _Address, reason: str, duration: Optional[float]) -> None: super().__init__(address=address, connection_id=0, duration=duration) self.__reason = reason @property def reason(self) -> str: """A reason explaining why connection check out failed. The reason must be one of the strings from the :class:`ConnectionCheckOutFailedReason` enum. """ return self.__reason def __repr__(self) -> str: return f"{self.__class__.__name__}({self.address!r}, {self.__reason!r}, {self.duration!r})" class ConnectionCheckedOutEvent(_ConnectionDurationEvent): """Published when the driver successfully checks out a connection. :param address: The address (host, port) pair of the server this Connection is attempting to connect to. :param connection_id: The integer ID of the Connection in this Pool. .. versionadded:: 3.9 """ __slots__ = () class ConnectionCheckedInEvent(_ConnectionIdEvent): """Published when the driver checks in a Connection into the Pool. :param address: The address (host, port) pair of the server this Connection is attempting to connect to. :param connection_id: The integer ID of the Connection in this Pool. .. versionadded:: 3.9 """ __slots__ = () class _ServerEvent: """Base class for server events.""" __slots__ = ("__server_address", "__topology_id") def __init__(self, server_address: _Address, topology_id: ObjectId) -> None: self.__server_address = server_address self.__topology_id = topology_id @property def server_address(self) -> _Address: """The address (host, port) pair of the server""" return self.__server_address @property def topology_id(self) -> ObjectId: """A unique identifier for the topology this server is a part of.""" return self.__topology_id def __repr__(self) -> str: return f"<{self.__class__.__name__} {self.server_address} topology_id: {self.topology_id}>" class ServerDescriptionChangedEvent(_ServerEvent): """Published when server description changes. .. versionadded:: 3.3 """ __slots__ = ("__previous_description", "__new_description") def __init__( self, previous_description: ServerDescription, new_description: ServerDescription, *args: Any, ) -> None: super().__init__(*args) self.__previous_description = previous_description self.__new_description = new_description @property def previous_description(self) -> ServerDescription: """The previous :class:`~pymongo.server_description.ServerDescription`. """ return self.__previous_description @property def new_description(self) -> ServerDescription: """The new :class:`~pymongo.server_description.ServerDescription`. """ return self.__new_description def __repr__(self) -> str: return "<{} {} changed from: {}, to: {}>".format( self.__class__.__name__, self.server_address, self.previous_description, self.new_description, ) class ServerOpeningEvent(_ServerEvent): """Published when server is initialized. .. versionadded:: 3.3 """ __slots__ = () class ServerClosedEvent(_ServerEvent): """Published when server is closed. .. versionadded:: 3.3 """ __slots__ = () class TopologyEvent: """Base class for topology description events.""" __slots__ = ("__topology_id",) def __init__(self, topology_id: ObjectId) -> None: self.__topology_id = topology_id @property def topology_id(self) -> ObjectId: """A unique identifier for the topology this server is a part of.""" return self.__topology_id def __repr__(self) -> str: return f"<{self.__class__.__name__} topology_id: {self.topology_id}>" class TopologyDescriptionChangedEvent(TopologyEvent): """Published when the topology description changes. .. versionadded:: 3.3 """ __slots__ = ("__previous_description", "__new_description") def __init__( self, previous_description: TopologyDescription, new_description: TopologyDescription, *args: Any, ) -> None: super().__init__(*args) self.__previous_description = previous_description self.__new_description = new_description @property def previous_description(self) -> TopologyDescription: """The previous :class:`~pymongo.topology_description.TopologyDescription`. """ return self.__previous_description @property def new_description(self) -> TopologyDescription: """The new :class:`~pymongo.topology_description.TopologyDescription`. """ return self.__new_description def __repr__(self) -> str: return "<{} topology_id: {} changed from: {}, to: {}>".format( self.__class__.__name__, self.topology_id, self.previous_description, self.new_description, ) class TopologyOpenedEvent(TopologyEvent): """Published when the topology is initialized. .. versionadded:: 3.3 """ __slots__ = () class TopologyClosedEvent(TopologyEvent): """Published when the topology is closed. .. versionadded:: 3.3 """ __slots__ = () class _ServerHeartbeatEvent: """Base class for server heartbeat events.""" __slots__ = ("__connection_id", "__awaited") def __init__(self, connection_id: _Address, awaited: bool = False) -> None: self.__connection_id = connection_id self.__awaited = awaited @property def connection_id(self) -> _Address: """The address (host, port) of the server this heartbeat was sent to. """ return self.__connection_id @property def awaited(self) -> bool: """Whether the heartbeat was issued as an awaitable hello command. .. versionadded:: 4.6 """ return self.__awaited def __repr__(self) -> str: return f"<{self.__class__.__name__} {self.connection_id} awaited: {self.awaited}>" class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent): """Published when a heartbeat is started. .. versionadded:: 3.3 """ __slots__ = () class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent): """Fired when the server heartbeat succeeds. .. versionadded:: 3.3 """ __slots__ = ("__duration", "__reply") def __init__( self, duration: float, reply: Hello, connection_id: _Address, awaited: bool = False ) -> None: super().__init__(connection_id, awaited) self.__duration = duration self.__reply = reply @property def duration(self) -> float: """The duration of this heartbeat in microseconds.""" return self.__duration @property def reply(self) -> Hello: """An instance of :class:`~pymongo.hello.Hello`.""" return self.__reply @property def awaited(self) -> bool: """Whether the heartbeat was awaited. If true, then :meth:`duration` reflects the sum of the round trip time to the server and the time that the server waited before sending a response. .. versionadded:: 3.11 """ return super().awaited def __repr__(self) -> str: return "<{} {} duration: {}, awaited: {}, reply: {}>".format( self.__class__.__name__, self.connection_id, self.duration, self.awaited, self.reply, ) class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent): """Fired when the server heartbeat fails, either with an "ok: 0" or a socket exception. .. versionadded:: 3.3 """ __slots__ = ("__duration", "__reply") def __init__( self, duration: float, reply: Exception, connection_id: _Address, awaited: bool = False ) -> None: super().__init__(connection_id, awaited) self.__duration = duration self.__reply = reply @property def duration(self) -> float: """The duration of this heartbeat in microseconds.""" return self.__duration @property def reply(self) -> Exception: """A subclass of :exc:`Exception`.""" return self.__reply @property def awaited(self) -> bool: """Whether the heartbeat was awaited. If true, then :meth:`duration` reflects the sum of the round trip time to the server and the time that the server waited before sending a response. .. versionadded:: 3.11 """ return super().awaited def __repr__(self) -> str: return "<{} {} duration: {}, awaited: {}, reply: {!r}>".format( self.__class__.__name__, self.connection_id, self.duration, self.awaited, self.reply, ) class _EventListeners: """Configure event listeners for a client instance. Any event listeners registered globally are included by default. :param listeners: A list of event listeners. """ def __init__(self, listeners: Optional[Sequence[_EventListener]]): self.__command_listeners = _LISTENERS.command_listeners[:] self.__server_listeners = _LISTENERS.server_listeners[:] lst = _LISTENERS.server_heartbeat_listeners self.__server_heartbeat_listeners = lst[:] self.__topology_listeners = _LISTENERS.topology_listeners[:] self.__cmap_listeners = _LISTENERS.cmap_listeners[:] if listeners is not None: for lst in listeners: if isinstance(lst, CommandListener): self.__command_listeners.append(lst) if isinstance(lst, ServerListener): self.__server_listeners.append(lst) if isinstance(lst, ServerHeartbeatListener): self.__server_heartbeat_listeners.append(lst) if isinstance(lst, TopologyListener): self.__topology_listeners.append(lst) if isinstance(lst, ConnectionPoolListener): self.__cmap_listeners.append(lst) self.__enabled_for_commands = bool(self.__command_listeners) self.__enabled_for_server = bool(self.__server_listeners) self.__enabled_for_server_heartbeat = bool(self.__server_heartbeat_listeners) self.__enabled_for_topology = bool(self.__topology_listeners) self.__enabled_for_cmap = bool(self.__cmap_listeners) @property def enabled_for_commands(self) -> bool: """Are any CommandListener instances registered?""" return self.__enabled_for_commands @property def enabled_for_server(self) -> bool: """Are any ServerListener instances registered?""" return self.__enabled_for_server @property def enabled_for_server_heartbeat(self) -> bool: """Are any ServerHeartbeatListener instances registered?""" return self.__enabled_for_server_heartbeat @property def enabled_for_topology(self) -> bool: """Are any TopologyListener instances registered?""" return self.__enabled_for_topology @property def enabled_for_cmap(self) -> bool: """Are any ConnectionPoolListener instances registered?""" return self.__enabled_for_cmap def event_listeners(self) -> list[_EventListeners]: """List of registered event listeners.""" return ( self.__command_listeners + self.__server_heartbeat_listeners + self.__server_listeners + self.__topology_listeners + self.__cmap_listeners ) def publish_command_start( self, command: _DocumentOut, database_name: str, request_id: int, connection_id: _Address, server_connection_id: Optional[int], op_id: Optional[int] = None, service_id: Optional[ObjectId] = None, ) -> None: """Publish a CommandStartedEvent to all command listeners. :param command: The command document. :param database_name: The name of the database this command was run against. :param request_id: The request id for this operation. :param connection_id: The address (host, port) of the server this command was sent to. :param op_id: The (optional) operation id for this operation. :param service_id: The service_id this command was sent to, or ``None``. """ if op_id is None: op_id = request_id event = CommandStartedEvent( command, database_name, request_id, connection_id, op_id, service_id=service_id, server_connection_id=server_connection_id, ) for subscriber in self.__command_listeners: try: subscriber.started(event) except Exception: _handle_exception() def publish_command_success( self, duration: timedelta, reply: _DocumentOut, command_name: str, request_id: int, connection_id: _Address, server_connection_id: Optional[int], op_id: Optional[int] = None, service_id: Optional[ObjectId] = None, speculative_hello: bool = False, database_name: str = "", ) -> None: """Publish a CommandSucceededEvent to all command listeners. :param duration: The command duration as a datetime.timedelta. :param reply: The server reply document. :param command_name: The command name. :param request_id: The request id for this operation. :param connection_id: The address (host, port) of the server this command was sent to. :param op_id: The (optional) operation id for this operation. :param service_id: The service_id this command was sent to, or ``None``. :param speculative_hello: Was the command sent with speculative auth? :param database_name: The database this command was sent to, or ``""``. """ if op_id is None: op_id = request_id if speculative_hello: # Redact entire response when the command started contained # speculativeAuthenticate. reply = {} event = CommandSucceededEvent( duration, reply, command_name, request_id, connection_id, op_id, service_id, database_name=database_name, server_connection_id=server_connection_id, ) for subscriber in self.__command_listeners: try: subscriber.succeeded(event) except Exception: _handle_exception() def publish_command_failure( self, duration: timedelta, failure: _DocumentOut, command_name: str, request_id: int, connection_id: _Address, server_connection_id: Optional[int], op_id: Optional[int] = None, service_id: Optional[ObjectId] = None, database_name: str = "", ) -> None: """Publish a CommandFailedEvent to all command listeners. :param duration: The command duration as a datetime.timedelta. :param failure: The server reply document or failure description document. :param command_name: The command name. :param request_id: The request id for this operation. :param connection_id: The address (host, port) of the server this command was sent to. :param op_id: The (optional) operation id for this operation. :param service_id: The service_id this command was sent to, or ``None``. :param database_name: The database this command was sent to, or ``""``. """ if op_id is None: op_id = request_id event = CommandFailedEvent( duration, failure, command_name, request_id, connection_id, op_id, service_id=service_id, database_name=database_name, server_connection_id=server_connection_id, ) for subscriber in self.__command_listeners: try: subscriber.failed(event) except Exception: _handle_exception() def publish_server_heartbeat_started(self, connection_id: _Address, awaited: bool) -> None: """Publish a ServerHeartbeatStartedEvent to all server heartbeat listeners. :param connection_id: The address (host, port) pair of the connection. :param awaited: True if this heartbeat is part of an awaitable hello command. """ event = ServerHeartbeatStartedEvent(connection_id, awaited) for subscriber in self.__server_heartbeat_listeners: try: subscriber.started(event) except Exception: _handle_exception() def publish_server_heartbeat_succeeded( self, connection_id: _Address, duration: float, reply: Hello, awaited: bool ) -> None: """Publish a ServerHeartbeatSucceededEvent to all server heartbeat listeners. :param connection_id: The address (host, port) pair of the connection. :param duration: The execution time of the event in the highest possible resolution for the platform. :param reply: The command reply. :param awaited: True if the response was awaited. """ event = ServerHeartbeatSucceededEvent(duration, reply, connection_id, awaited) for subscriber in self.__server_heartbeat_listeners: try: subscriber.succeeded(event) except Exception: _handle_exception() def publish_server_heartbeat_failed( self, connection_id: _Address, duration: float, reply: Exception, awaited: bool ) -> None: """Publish a ServerHeartbeatFailedEvent to all server heartbeat listeners. :param connection_id: The address (host, port) pair of the connection. :param duration: The execution time of the event in the highest possible resolution for the platform. :param reply: The command reply. :param awaited: True if the response was awaited. """ event = ServerHeartbeatFailedEvent(duration, reply, connection_id, awaited) for subscriber in self.__server_heartbeat_listeners: try: subscriber.failed(event) except Exception: _handle_exception() def publish_server_opened(self, server_address: _Address, topology_id: ObjectId) -> None: """Publish a ServerOpeningEvent to all server listeners. :param server_address: The address (host, port) pair of the server. :param topology_id: A unique identifier for the topology this server is a part of. """ event = ServerOpeningEvent(server_address, topology_id) for subscriber in self.__server_listeners: try: subscriber.opened(event) except Exception: _handle_exception() def publish_server_closed(self, server_address: _Address, topology_id: ObjectId) -> None: """Publish a ServerClosedEvent to all server listeners. :param server_address: The address (host, port) pair of the server. :param topology_id: A unique identifier for the topology this server is a part of. """ event = ServerClosedEvent(server_address, topology_id) for subscriber in self.__server_listeners: try: subscriber.closed(event) except Exception: _handle_exception() def publish_server_description_changed( self, previous_description: ServerDescription, new_description: ServerDescription, server_address: _Address, topology_id: ObjectId, ) -> None: """Publish a ServerDescriptionChangedEvent to all server listeners. :param previous_description: The previous server description. :param server_address: The address (host, port) pair of the server. :param new_description: The new server description. :param topology_id: A unique identifier for the topology this server is a part of. """ event = ServerDescriptionChangedEvent( previous_description, new_description, server_address, topology_id ) for subscriber in self.__server_listeners: try: subscriber.description_changed(event) except Exception: _handle_exception() def publish_topology_opened(self, topology_id: ObjectId) -> None: """Publish a TopologyOpenedEvent to all topology listeners. :param topology_id: A unique identifier for the topology this server is a part of. """ event = TopologyOpenedEvent(topology_id) for subscriber in self.__topology_listeners: try: subscriber.opened(event) except Exception: _handle_exception() def publish_topology_closed(self, topology_id: ObjectId) -> None: """Publish a TopologyClosedEvent to all topology listeners. :param topology_id: A unique identifier for the topology this server is a part of. """ event = TopologyClosedEvent(topology_id) for subscriber in self.__topology_listeners: try: subscriber.closed(event) except Exception: _handle_exception() def publish_topology_description_changed( self, previous_description: TopologyDescription, new_description: TopologyDescription, topology_id: ObjectId, ) -> None: """Publish a TopologyDescriptionChangedEvent to all topology listeners. :param previous_description: The previous topology description. :param new_description: The new topology description. :param topology_id: A unique identifier for the topology this server is a part of. """ event = TopologyDescriptionChangedEvent(previous_description, new_description, topology_id) for subscriber in self.__topology_listeners: try: subscriber.description_changed(event) except Exception: _handle_exception() def publish_pool_created(self, address: _Address, options: dict[str, Any]) -> None: """Publish a :class:`PoolCreatedEvent` to all pool listeners.""" event = PoolCreatedEvent(address, options) for subscriber in self.__cmap_listeners: try: subscriber.pool_created(event) except Exception: _handle_exception() def publish_pool_ready(self, address: _Address) -> None: """Publish a :class:`PoolReadyEvent` to all pool listeners.""" event = PoolReadyEvent(address) for subscriber in self.__cmap_listeners: try: subscriber.pool_ready(event) except Exception: _handle_exception() def publish_pool_cleared( self, address: _Address, service_id: Optional[ObjectId], interrupt_connections: bool = False, ) -> None: """Publish a :class:`PoolClearedEvent` to all pool listeners.""" event = PoolClearedEvent(address, service_id, interrupt_connections) for subscriber in self.__cmap_listeners: try: subscriber.pool_cleared(event) except Exception: _handle_exception() def publish_pool_closed(self, address: _Address) -> None: """Publish a :class:`PoolClosedEvent` to all pool listeners.""" event = PoolClosedEvent(address) for subscriber in self.__cmap_listeners: try: subscriber.pool_closed(event) except Exception: _handle_exception() def publish_connection_created(self, address: _Address, connection_id: int) -> None: """Publish a :class:`ConnectionCreatedEvent` to all connection listeners. """ event = ConnectionCreatedEvent(address, connection_id) for subscriber in self.__cmap_listeners: try: subscriber.connection_created(event) except Exception: _handle_exception() def publish_connection_ready( self, address: _Address, connection_id: int, duration: float ) -> None: """Publish a :class:`ConnectionReadyEvent` to all connection listeners.""" event = ConnectionReadyEvent(address, connection_id, duration) for subscriber in self.__cmap_listeners: try: subscriber.connection_ready(event) except Exception: _handle_exception() def publish_connection_closed(self, address: _Address, connection_id: int, reason: str) -> None: """Publish a :class:`ConnectionClosedEvent` to all connection listeners. """ event = ConnectionClosedEvent(address, connection_id, reason) for subscriber in self.__cmap_listeners: try: subscriber.connection_closed(event) except Exception: _handle_exception() def publish_connection_check_out_started(self, address: _Address) -> None: """Publish a :class:`ConnectionCheckOutStartedEvent` to all connection listeners. """ event = ConnectionCheckOutStartedEvent(address) for subscriber in self.__cmap_listeners: try: subscriber.connection_check_out_started(event) except Exception: _handle_exception() def publish_connection_check_out_failed( self, address: _Address, reason: str, duration: float ) -> None: """Publish a :class:`ConnectionCheckOutFailedEvent` to all connection listeners. """ event = ConnectionCheckOutFailedEvent(address, reason, duration) for subscriber in self.__cmap_listeners: try: subscriber.connection_check_out_failed(event) except Exception: _handle_exception() def publish_connection_checked_out( self, address: _Address, connection_id: int, duration: float ) -> None: """Publish a :class:`ConnectionCheckedOutEvent` to all connection listeners. """ event = ConnectionCheckedOutEvent(address, connection_id, duration) for subscriber in self.__cmap_listeners: try: subscriber.connection_checked_out(event) except Exception: _handle_exception() def publish_connection_checked_in(self, address: _Address, connection_id: int) -> None: """Publish a :class:`ConnectionCheckedInEvent` to all connection listeners. """ event = ConnectionCheckedInEvent(address, connection_id) for subscriber in self.__cmap_listeners: try: subscriber.connection_checked_in(event) except Exception: _handle_exception()