# Copyright 2009-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tools for using Python's :mod:`json` module with BSON documents. This module provides two helper methods `dumps` and `loads` that wrap the native :mod:`json` methods and provide explicit BSON conversion to and from JSON. :class:`~bson.json_util.JSONOptions` provides a way to control how JSON is emitted and parsed, with the default being the Relaxed Extended JSON format. :mod:`~bson.json_util` can also generate Canonical or legacy `Extended JSON`_ when :const:`CANONICAL_JSON_OPTIONS` or :const:`LEGACY_JSON_OPTIONS` is provided, respectively. .. _Extended JSON: https://github.com/mongodb/specifications/blob/master/source/extended-json.rst Example usage (deserialization): .. doctest:: >>> from bson.json_util import loads >>> loads( ... '[{"foo": [1, 2]}, {"bar": {"hello": "world"}}, {"code": {"$scope": {}, "$code": "function x() { return 1; }"}}, {"bin": {"$type": "80", "$binary": "AQIDBA=="}}]' ... ) [{'foo': [1, 2]}, {'bar': {'hello': 'world'}}, {'code': Code('function x() { return 1; }', {})}, {'bin': Binary(b'...', 128)}] Example usage with :const:`RELAXED_JSON_OPTIONS` (the default): .. doctest:: >>> from bson import Binary, Code >>> from bson.json_util import dumps >>> dumps( ... [ ... {"foo": [1, 2]}, ... {"bar": {"hello": "world"}}, ... {"code": Code("function x() { return 1; }")}, ... {"bin": Binary(b"\x01\x02\x03\x04")}, ... ] ... ) '[{"foo": [1, 2]}, {"bar": {"hello": "world"}}, {"code": {"$code": "function x() { return 1; }"}}, {"bin": {"$binary": {"base64": "AQIDBA==", "subType": "00"}}}]' Example usage (with :const:`CANONICAL_JSON_OPTIONS`): .. doctest:: >>> from bson import Binary, Code >>> from bson.json_util import dumps, CANONICAL_JSON_OPTIONS >>> dumps( ... [ ... {"foo": [1, 2]}, ... {"bar": {"hello": "world"}}, ... {"code": Code("function x() { return 1; }")}, ... {"bin": Binary(b"\x01\x02\x03\x04")}, ... ], ... json_options=CANONICAL_JSON_OPTIONS, ... ) '[{"foo": [{"$numberInt": "1"}, {"$numberInt": "2"}]}, {"bar": {"hello": "world"}}, {"code": {"$code": "function x() { return 1; }"}}, {"bin": {"$binary": {"base64": "AQIDBA==", "subType": "00"}}}]' Example usage (with :const:`LEGACY_JSON_OPTIONS`): .. doctest:: >>> from bson import Binary, Code >>> from bson.json_util import dumps, LEGACY_JSON_OPTIONS >>> dumps( ... [ ... {"foo": [1, 2]}, ... {"bar": {"hello": "world"}}, ... {"code": Code("function x() { return 1; }", {})}, ... {"bin": Binary(b"\x01\x02\x03\x04")}, ... ], ... json_options=LEGACY_JSON_OPTIONS, ... ) '[{"foo": [1, 2]}, {"bar": {"hello": "world"}}, {"code": {"$code": "function x() { return 1; }", "$scope": {}}}, {"bin": {"$binary": "AQIDBA==", "$type": "00"}}]' Alternatively, you can manually pass the `default` to :func:`json.dumps`. It won't handle :class:`~bson.binary.Binary` and :class:`~bson.code.Code` instances (as they are extended strings you can't provide custom defaults), but it will be faster as there is less recursion. .. note:: If your application does not need the flexibility offered by :class:`JSONOptions` and spends a large amount of time in the `json_util` module, look to `python-bsonjs `_ for a nice performance improvement. `python-bsonjs` is a fast BSON to MongoDB Extended JSON converter for Python built on top of `libbson `_. `python-bsonjs` works best with PyMongo when using :class:`~bson.raw_bson.RawBSONDocument`. """ from __future__ import annotations import base64 import datetime import json import math import re import uuid from typing import ( TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, Optional, Sequence, Tuple, Type, Union, cast, ) from bson.binary import ALL_UUID_SUBTYPES, UUID_SUBTYPE, Binary, UuidRepresentation from bson.code import Code from bson.codec_options import CodecOptions, DatetimeConversion from bson.datetime_ms import ( EPOCH_AWARE, DatetimeMS, _datetime_to_millis, _max_datetime_ms, _millis_to_datetime, ) from bson.dbref import DBRef from bson.decimal128 import Decimal128 from bson.int64 import Int64 from bson.max_key import MaxKey from bson.min_key import MinKey from bson.objectid import ObjectId from bson.regex import Regex from bson.son import RE_TYPE from bson.timestamp import Timestamp from bson.tz_util import utc _RE_OPT_TABLE = { "i": re.I, "l": re.L, "m": re.M, "s": re.S, "u": re.U, "x": re.X, } class DatetimeRepresentation: LEGACY = 0 """Legacy MongoDB Extended JSON datetime representation. :class:`datetime.datetime` instances will be encoded to JSON in the format `{"$date": }`, where `dateAsMilliseconds` is a 64-bit signed integer giving the number of milliseconds since the Unix epoch UTC. This was the default encoding before PyMongo version 3.4. .. versionadded:: 3.4 """ NUMBERLONG = 1 """NumberLong datetime representation. :class:`datetime.datetime` instances will be encoded to JSON in the format `{"$date": {"$numberLong": ""}}`, where `dateAsMilliseconds` is the string representation of a 64-bit signed integer giving the number of milliseconds since the Unix epoch UTC. .. versionadded:: 3.4 """ ISO8601 = 2 """ISO-8601 datetime representation. :class:`datetime.datetime` instances greater than or equal to the Unix epoch UTC will be encoded to JSON in the format `{"$date": ""}`. :class:`datetime.datetime` instances before the Unix epoch UTC will be encoded as if the datetime representation is :const:`~DatetimeRepresentation.NUMBERLONG`. .. versionadded:: 3.4 """ class JSONMode: LEGACY = 0 """Legacy Extended JSON representation. In this mode, :func:`~bson.json_util.dumps` produces PyMongo's legacy non-standard JSON output. Consider using :const:`~bson.json_util.JSONMode.RELAXED` or :const:`~bson.json_util.JSONMode.CANONICAL` instead. .. versionadded:: 3.5 """ RELAXED = 1 """Relaxed Extended JSON representation. In this mode, :func:`~bson.json_util.dumps` produces Relaxed Extended JSON, a mostly JSON-like format. Consider using this for things like a web API, where one is sending a document (or a projection of a document) that only uses ordinary JSON type primitives. In particular, the ``int``, :class:`~bson.int64.Int64`, and ``float`` numeric types are represented in the native JSON number format. This output is also the most human readable and is useful for debugging and documentation. .. seealso:: The specification for Relaxed `Extended JSON`_. .. versionadded:: 3.5 """ CANONICAL = 2 """Canonical Extended JSON representation. In this mode, :func:`~bson.json_util.dumps` produces Canonical Extended JSON, a type preserving format. Consider using this for things like testing, where one has to precisely specify expected types in JSON. In particular, the ``int``, :class:`~bson.int64.Int64`, and ``float`` numeric types are encoded with type wrappers. .. seealso:: The specification for Canonical `Extended JSON`_. .. versionadded:: 3.5 """ if TYPE_CHECKING: _BASE_CLASS = CodecOptions[MutableMapping[str, Any]] else: _BASE_CLASS = CodecOptions _INT32_MAX = 2**31 class JSONOptions(_BASE_CLASS): json_mode: int strict_number_long: bool datetime_representation: int strict_uuid: bool document_class: Type[MutableMapping[str, Any]] def __init__(self, *args: Any, **kwargs: Any): """Encapsulates JSON options for :func:`dumps` and :func:`loads`. :param strict_number_long: If ``True``, :class:`~bson.int64.Int64` objects are encoded to MongoDB Extended JSON's *Strict mode* type `NumberLong`, ie ``'{"$numberLong": "" }'``. Otherwise they will be encoded as an `int`. Defaults to ``False``. :param datetime_representation: The representation to use when encoding instances of :class:`datetime.datetime`. Defaults to :const:`~DatetimeRepresentation.LEGACY`. :param strict_uuid: If ``True``, :class:`uuid.UUID` object are encoded to MongoDB Extended JSON's *Strict mode* type `Binary`. Otherwise it will be encoded as ``'{"$uuid": "" }'``. Defaults to ``False``. :param json_mode: The :class:`JSONMode` to use when encoding BSON types to Extended JSON. Defaults to :const:`~JSONMode.LEGACY`. :param document_class: BSON documents returned by :func:`loads` will be decoded to an instance of this class. Must be a subclass of :class:`collections.MutableMapping`. Defaults to :class:`dict`. :param uuid_representation: The :class:`~bson.binary.UuidRepresentation` to use when encoding and decoding instances of :class:`uuid.UUID`. Defaults to :const:`~bson.binary.UuidRepresentation.UNSPECIFIED`. :param tz_aware: If ``True``, MongoDB Extended JSON's *Strict mode* type `Date` will be decoded to timezone aware instances of :class:`datetime.datetime`. Otherwise they will be naive. Defaults to ``False``. :param tzinfo: A :class:`datetime.tzinfo` subclass that specifies the timezone from which :class:`~datetime.datetime` objects should be decoded. Defaults to :const:`~bson.tz_util.utc`. :param datetime_conversion: Specifies how UTC datetimes should be decoded within BSON. Valid options include 'datetime_ms' to return as a DatetimeMS, 'datetime' to return as a datetime.datetime and raising a ValueError for out-of-range values, 'datetime_auto' to return DatetimeMS objects when the underlying datetime is out-of-range and 'datetime_clamp' to clamp to the minimum and maximum possible datetimes. Defaults to 'datetime'. See :ref:`handling-out-of-range-datetimes` for details. :param args: arguments to :class:`~bson.codec_options.CodecOptions` :param kwargs: arguments to :class:`~bson.codec_options.CodecOptions` .. seealso:: The specification for Relaxed and Canonical `Extended JSON`_. .. versionchanged:: 4.0 The default for `json_mode` was changed from :const:`JSONMode.LEGACY` to :const:`JSONMode.RELAXED`. The default for `uuid_representation` was changed from :const:`~bson.binary.UuidRepresentation.PYTHON_LEGACY` to :const:`~bson.binary.UuidRepresentation.UNSPECIFIED`. .. versionchanged:: 3.5 Accepts the optional parameter `json_mode`. .. versionchanged:: 4.0 Changed default value of `tz_aware` to False. """ super().__init__() def __new__( cls: Type[JSONOptions], strict_number_long: Optional[bool] = None, datetime_representation: Optional[int] = None, strict_uuid: Optional[bool] = None, json_mode: int = JSONMode.RELAXED, *args: Any, **kwargs: Any, ) -> JSONOptions: kwargs["tz_aware"] = kwargs.get("tz_aware", False) if kwargs["tz_aware"]: kwargs["tzinfo"] = kwargs.get("tzinfo", utc) if datetime_representation not in ( DatetimeRepresentation.LEGACY, DatetimeRepresentation.NUMBERLONG, DatetimeRepresentation.ISO8601, None, ): raise ValueError( "JSONOptions.datetime_representation must be one of LEGACY, " "NUMBERLONG, or ISO8601 from DatetimeRepresentation." ) self = cast(JSONOptions, super().__new__(cls, *args, **kwargs)) # type:ignore[arg-type] if json_mode not in (JSONMode.LEGACY, JSONMode.RELAXED, JSONMode.CANONICAL): raise ValueError( "JSONOptions.json_mode must be one of LEGACY, RELAXED, " "or CANONICAL from JSONMode." ) self.json_mode = json_mode if self.json_mode == JSONMode.RELAXED: if strict_number_long: raise ValueError("Cannot specify strict_number_long=True with JSONMode.RELAXED") if datetime_representation not in (None, DatetimeRepresentation.ISO8601): raise ValueError( "datetime_representation must be DatetimeRepresentation." "ISO8601 or omitted with JSONMode.RELAXED" ) if strict_uuid not in (None, True): raise ValueError("Cannot specify strict_uuid=False with JSONMode.RELAXED") self.strict_number_long = False self.datetime_representation = DatetimeRepresentation.ISO8601 self.strict_uuid = True elif self.json_mode == JSONMode.CANONICAL: if strict_number_long not in (None, True): raise ValueError("Cannot specify strict_number_long=False with JSONMode.RELAXED") if datetime_representation not in (None, DatetimeRepresentation.NUMBERLONG): raise ValueError( "datetime_representation must be DatetimeRepresentation." "NUMBERLONG or omitted with JSONMode.RELAXED" ) if strict_uuid not in (None, True): raise ValueError("Cannot specify strict_uuid=False with JSONMode.RELAXED") self.strict_number_long = True self.datetime_representation = DatetimeRepresentation.NUMBERLONG self.strict_uuid = True else: # JSONMode.LEGACY self.strict_number_long = False self.datetime_representation = DatetimeRepresentation.LEGACY self.strict_uuid = False if strict_number_long is not None: self.strict_number_long = strict_number_long if datetime_representation is not None: self.datetime_representation = datetime_representation if strict_uuid is not None: self.strict_uuid = strict_uuid return self def _arguments_repr(self) -> str: return ( "strict_number_long={!r}, " "datetime_representation={!r}, " "strict_uuid={!r}, json_mode={!r}, {}".format( self.strict_number_long, self.datetime_representation, self.strict_uuid, self.json_mode, super()._arguments_repr(), ) ) def _options_dict(self) -> dict[Any, Any]: # TODO: PYTHON-2442 use _asdict() instead options_dict = super()._options_dict() options_dict.update( { "strict_number_long": self.strict_number_long, "datetime_representation": self.datetime_representation, "strict_uuid": self.strict_uuid, "json_mode": self.json_mode, } ) return options_dict def with_options(self, **kwargs: Any) -> JSONOptions: """ Make a copy of this JSONOptions, overriding some options:: >>> from bson.json_util import CANONICAL_JSON_OPTIONS >>> CANONICAL_JSON_OPTIONS.tz_aware True >>> json_options = CANONICAL_JSON_OPTIONS.with_options(tz_aware=False, tzinfo=None) >>> json_options.tz_aware False .. versionadded:: 3.12 """ opts = self._options_dict() for opt in ("strict_number_long", "datetime_representation", "strict_uuid", "json_mode"): opts[opt] = kwargs.get(opt, getattr(self, opt)) opts.update(kwargs) return JSONOptions(**opts) LEGACY_JSON_OPTIONS: JSONOptions = JSONOptions(json_mode=JSONMode.LEGACY) """:class:`JSONOptions` for encoding to PyMongo's legacy JSON format. .. seealso:: The documentation for :const:`bson.json_util.JSONMode.LEGACY`. .. versionadded:: 3.5 """ CANONICAL_JSON_OPTIONS: JSONOptions = JSONOptions(json_mode=JSONMode.CANONICAL) """:class:`JSONOptions` for Canonical Extended JSON. .. seealso:: The documentation for :const:`bson.json_util.JSONMode.CANONICAL`. .. versionadded:: 3.5 """ RELAXED_JSON_OPTIONS: JSONOptions = JSONOptions(json_mode=JSONMode.RELAXED) """:class:`JSONOptions` for Relaxed Extended JSON. .. seealso:: The documentation for :const:`bson.json_util.JSONMode.RELAXED`. .. versionadded:: 3.5 """ DEFAULT_JSON_OPTIONS: JSONOptions = RELAXED_JSON_OPTIONS """The default :class:`JSONOptions` for JSON encoding/decoding. The same as :const:`RELAXED_JSON_OPTIONS`. .. versionchanged:: 4.0 Changed from :const:`LEGACY_JSON_OPTIONS` to :const:`RELAXED_JSON_OPTIONS`. .. versionadded:: 3.4 """ def dumps(obj: Any, *args: Any, **kwargs: Any) -> str: """Helper function that wraps :func:`json.dumps`. Recursive function that handles all BSON types including :class:`~bson.binary.Binary` and :class:`~bson.code.Code`. :param json_options: A :class:`JSONOptions` instance used to modify the encoding of MongoDB Extended JSON types. Defaults to :const:`DEFAULT_JSON_OPTIONS`. .. versionchanged:: 4.0 Now outputs MongoDB Relaxed Extended JSON by default (using :const:`DEFAULT_JSON_OPTIONS`). .. versionchanged:: 3.4 Accepts optional parameter `json_options`. See :class:`JSONOptions`. """ json_options = kwargs.pop("json_options", DEFAULT_JSON_OPTIONS) return json.dumps(_json_convert(obj, json_options), *args, **kwargs) def loads(s: Union[str, bytes, bytearray], *args: Any, **kwargs: Any) -> Any: """Helper function that wraps :func:`json.loads`. Automatically passes the object_hook for BSON type conversion. Raises ``TypeError``, ``ValueError``, ``KeyError``, or :exc:`~bson.errors.InvalidId` on invalid MongoDB Extended JSON. :param json_options: A :class:`JSONOptions` instance used to modify the decoding of MongoDB Extended JSON types. Defaults to :const:`DEFAULT_JSON_OPTIONS`. .. versionchanged:: 4.0 Now loads :class:`datetime.datetime` instances as naive by default. To load timezone aware instances utilize the `json_options` parameter. See :ref:`tz_aware_default_change` for an example. .. versionchanged:: 3.5 Parses Relaxed and Canonical Extended JSON as well as PyMongo's legacy format. Now raises ``TypeError`` or ``ValueError`` when parsing JSON type wrappers with values of the wrong type or any extra keys. .. versionchanged:: 3.4 Accepts optional parameter `json_options`. See :class:`JSONOptions`. """ json_options = kwargs.pop("json_options", DEFAULT_JSON_OPTIONS) # Execution time optimization if json_options.document_class is dict if json_options.document_class is dict: kwargs["object_hook"] = lambda obj: object_hook(obj, json_options) else: kwargs["object_pairs_hook"] = lambda pairs: object_pairs_hook(pairs, json_options) return json.loads(s, *args, **kwargs) def _json_convert(obj: Any, json_options: JSONOptions = DEFAULT_JSON_OPTIONS) -> Any: """Recursive helper method that converts BSON types so they can be converted into json. """ if hasattr(obj, "items"): return {k: _json_convert(v, json_options) for k, v in obj.items()} elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes)): return [_json_convert(v, json_options) for v in obj] try: return default(obj, json_options) except TypeError: return obj def object_pairs_hook( pairs: Sequence[Tuple[str, Any]], json_options: JSONOptions = DEFAULT_JSON_OPTIONS ) -> Any: return object_hook(json_options.document_class(pairs), json_options) # type:ignore[call-arg] def object_hook(dct: Mapping[str, Any], json_options: JSONOptions = DEFAULT_JSON_OPTIONS) -> Any: match = None for k in dct: if k in _PARSERS_SET: match = k break if match: return _PARSERS[match](dct, json_options) return dct def _parse_legacy_regex(doc: Any, dummy0: Any) -> Any: pattern = doc["$regex"] # Check if this is the $regex query operator. if not isinstance(pattern, (str, bytes)): return doc flags = 0 # PyMongo always adds $options but some other tools may not. for opt in doc.get("$options", ""): flags |= _RE_OPT_TABLE.get(opt, 0) return Regex(pattern, flags) def _parse_legacy_uuid(doc: Any, json_options: JSONOptions) -> Union[Binary, uuid.UUID]: """Decode a JSON legacy $uuid to Python UUID.""" if len(doc) != 1: raise TypeError(f"Bad $uuid, extra field(s): {doc}") if not isinstance(doc["$uuid"], str): raise TypeError(f"$uuid must be a string: {doc}") if json_options.uuid_representation == UuidRepresentation.UNSPECIFIED: return Binary.from_uuid(uuid.UUID(doc["$uuid"])) else: return uuid.UUID(doc["$uuid"]) def _binary_or_uuid(data: Any, subtype: int, json_options: JSONOptions) -> Union[Binary, uuid.UUID]: # special handling for UUID if subtype in ALL_UUID_SUBTYPES: uuid_representation = json_options.uuid_representation binary_value = Binary(data, subtype) if uuid_representation == UuidRepresentation.UNSPECIFIED: return binary_value if subtype == UUID_SUBTYPE: # Legacy behavior: use STANDARD with binary subtype 4. uuid_representation = UuidRepresentation.STANDARD elif uuid_representation == UuidRepresentation.STANDARD: # subtype == OLD_UUID_SUBTYPE # Legacy behavior: STANDARD is the same as PYTHON_LEGACY. uuid_representation = UuidRepresentation.PYTHON_LEGACY return binary_value.as_uuid(uuid_representation) if subtype == 0: return cast(uuid.UUID, data) return Binary(data, subtype) def _parse_legacy_binary(doc: Any, json_options: JSONOptions) -> Union[Binary, uuid.UUID]: if isinstance(doc["$type"], int): doc["$type"] = "%02x" % doc["$type"] subtype = int(doc["$type"], 16) if subtype >= 0xFFFFFF80: # Handle mongoexport values subtype = int(doc["$type"][6:], 16) data = base64.b64decode(doc["$binary"].encode()) return _binary_or_uuid(data, subtype, json_options) def _parse_canonical_binary(doc: Any, json_options: JSONOptions) -> Union[Binary, uuid.UUID]: binary = doc["$binary"] b64 = binary["base64"] subtype = binary["subType"] if not isinstance(b64, str): raise TypeError(f"$binary base64 must be a string: {doc}") if not isinstance(subtype, str) or len(subtype) > 2: raise TypeError(f"$binary subType must be a string at most 2 characters: {doc}") if len(binary) != 2: raise TypeError(f'$binary must include only "base64" and "subType" components: {doc}') data = base64.b64decode(b64.encode()) return _binary_or_uuid(data, int(subtype, 16), json_options) def _parse_canonical_datetime( doc: Any, json_options: JSONOptions ) -> Union[datetime.datetime, DatetimeMS]: """Decode a JSON datetime to python datetime.datetime.""" dtm = doc["$date"] if len(doc) != 1: raise TypeError(f"Bad $date, extra field(s): {doc}") # mongoexport 2.6 and newer if isinstance(dtm, str): # Parse offset if dtm[-1] == "Z": dt = dtm[:-1] offset = "Z" elif dtm[-6] in ("+", "-") and dtm[-3] == ":": # (+|-)HH:MM dt = dtm[:-6] offset = dtm[-6:] elif dtm[-5] in ("+", "-"): # (+|-)HHMM dt = dtm[:-5] offset = dtm[-5:] elif dtm[-3] in ("+", "-"): # (+|-)HH dt = dtm[:-3] offset = dtm[-3:] else: dt = dtm offset = "" # Parse the optional factional seconds portion. dot_index = dt.rfind(".") microsecond = 0 if dot_index != -1: microsecond = int(float(dt[dot_index:]) * 1000000) dt = dt[:dot_index] aware = datetime.datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S").replace( microsecond=microsecond, tzinfo=utc ) if offset and offset != "Z": if len(offset) == 6: hours, minutes = offset[1:].split(":") secs = int(hours) * 3600 + int(minutes) * 60 elif len(offset) == 5: secs = int(offset[1:3]) * 3600 + int(offset[3:]) * 60 elif len(offset) == 3: secs = int(offset[1:3]) * 3600 if offset[0] == "-": secs *= -1 aware = aware - datetime.timedelta(seconds=secs) if json_options.tz_aware: if json_options.tzinfo: aware = aware.astimezone(json_options.tzinfo) if json_options.datetime_conversion == DatetimeConversion.DATETIME_MS: return DatetimeMS(aware) return aware else: aware_tzinfo_none = aware.replace(tzinfo=None) if json_options.datetime_conversion == DatetimeConversion.DATETIME_MS: return DatetimeMS(aware_tzinfo_none) return aware_tzinfo_none return _millis_to_datetime(int(dtm), cast("CodecOptions[Any]", json_options)) def _parse_canonical_oid(doc: Any, dummy0: Any) -> ObjectId: """Decode a JSON ObjectId to bson.objectid.ObjectId.""" if len(doc) != 1: raise TypeError(f"Bad $oid, extra field(s): {doc}") return ObjectId(doc["$oid"]) def _parse_canonical_symbol(doc: Any, dummy0: Any) -> str: """Decode a JSON symbol to Python string.""" symbol = doc["$symbol"] if len(doc) != 1: raise TypeError(f"Bad $symbol, extra field(s): {doc}") return str(symbol) def _parse_canonical_code(doc: Any, dummy0: Any) -> Code: """Decode a JSON code to bson.code.Code.""" for key in doc: if key not in ("$code", "$scope"): raise TypeError(f"Bad $code, extra field(s): {doc}") return Code(doc["$code"], scope=doc.get("$scope")) def _parse_canonical_regex(doc: Any, dummy0: Any) -> Regex[str]: """Decode a JSON regex to bson.regex.Regex.""" regex = doc["$regularExpression"] if len(doc) != 1: raise TypeError(f"Bad $regularExpression, extra field(s): {doc}") if len(regex) != 2: raise TypeError( f'Bad $regularExpression must include only "pattern and "options" components: {doc}' ) opts = regex["options"] if not isinstance(opts, str): raise TypeError( "Bad $regularExpression options, options must be string, was type %s" % (type(opts)) ) return Regex(regex["pattern"], opts) def _parse_canonical_dbref(doc: Any, dummy0: Any) -> Any: """Decode a JSON DBRef to bson.dbref.DBRef.""" if ( isinstance(doc.get("$ref"), str) and "$id" in doc and isinstance(doc.get("$db"), (str, type(None))) ): return DBRef(doc.pop("$ref"), doc.pop("$id"), database=doc.pop("$db", None), **doc) return doc def _parse_canonical_dbpointer(doc: Any, dummy0: Any) -> Any: """Decode a JSON (deprecated) DBPointer to bson.dbref.DBRef.""" dbref = doc["$dbPointer"] if len(doc) != 1: raise TypeError(f"Bad $dbPointer, extra field(s): {doc}") if isinstance(dbref, DBRef): dbref_doc = dbref.as_doc() # DBPointer must not contain $db in its value. if dbref.database is not None: raise TypeError(f"Bad $dbPointer, extra field $db: {dbref_doc}") if not isinstance(dbref.id, ObjectId): raise TypeError(f"Bad $dbPointer, $id must be an ObjectId: {dbref_doc}") if len(dbref_doc) != 2: raise TypeError(f"Bad $dbPointer, extra field(s) in DBRef: {dbref_doc}") return dbref else: raise TypeError(f"Bad $dbPointer, expected a DBRef: {doc}") def _parse_canonical_int32(doc: Any, dummy0: Any) -> int: """Decode a JSON int32 to python int.""" i_str = doc["$numberInt"] if len(doc) != 1: raise TypeError(f"Bad $numberInt, extra field(s): {doc}") if not isinstance(i_str, str): raise TypeError(f"$numberInt must be string: {doc}") return int(i_str) def _parse_canonical_int64(doc: Any, dummy0: Any) -> Int64: """Decode a JSON int64 to bson.int64.Int64.""" l_str = doc["$numberLong"] if len(doc) != 1: raise TypeError(f"Bad $numberLong, extra field(s): {doc}") return Int64(l_str) def _parse_canonical_double(doc: Any, dummy0: Any) -> float: """Decode a JSON double to python float.""" d_str = doc["$numberDouble"] if len(doc) != 1: raise TypeError(f"Bad $numberDouble, extra field(s): {doc}") if not isinstance(d_str, str): raise TypeError(f"$numberDouble must be string: {doc}") return float(d_str) def _parse_canonical_decimal128(doc: Any, dummy0: Any) -> Decimal128: """Decode a JSON decimal128 to bson.decimal128.Decimal128.""" d_str = doc["$numberDecimal"] if len(doc) != 1: raise TypeError(f"Bad $numberDecimal, extra field(s): {doc}") if not isinstance(d_str, str): raise TypeError(f"$numberDecimal must be string: {doc}") return Decimal128(d_str) def _parse_canonical_minkey(doc: Any, dummy0: Any) -> MinKey: """Decode a JSON MinKey to bson.min_key.MinKey.""" if type(doc["$minKey"]) is not int or doc["$minKey"] != 1: # noqa: E721 raise TypeError(f"$minKey value must be 1: {doc}") if len(doc) != 1: raise TypeError(f"Bad $minKey, extra field(s): {doc}") return MinKey() def _parse_canonical_maxkey(doc: Any, dummy0: Any) -> MaxKey: """Decode a JSON MaxKey to bson.max_key.MaxKey.""" if type(doc["$maxKey"]) is not int or doc["$maxKey"] != 1: # noqa: E721 raise TypeError("$maxKey value must be 1: %s", (doc,)) if len(doc) != 1: raise TypeError(f"Bad $minKey, extra field(s): {doc}") return MaxKey() def _parse_binary(doc: Any, json_options: JSONOptions) -> Union[Binary, uuid.UUID]: if "$type" in doc: return _parse_legacy_binary(doc, json_options) else: return _parse_canonical_binary(doc, json_options) def _parse_timestamp(doc: Any, dummy0: Any) -> Timestamp: tsp = doc["$timestamp"] return Timestamp(tsp["t"], tsp["i"]) _PARSERS: dict[str, Callable[[Any, JSONOptions], Any]] = { "$oid": _parse_canonical_oid, "$ref": _parse_canonical_dbref, "$date": _parse_canonical_datetime, "$regex": _parse_legacy_regex, "$minKey": _parse_canonical_minkey, "$maxKey": _parse_canonical_maxkey, "$binary": _parse_binary, "$code": _parse_canonical_code, "$uuid": _parse_legacy_uuid, "$undefined": lambda _, _1: None, "$numberLong": _parse_canonical_int64, "$timestamp": _parse_timestamp, "$numberDecimal": _parse_canonical_decimal128, "$dbPointer": _parse_canonical_dbpointer, "$regularExpression": _parse_canonical_regex, "$symbol": _parse_canonical_symbol, "$numberInt": _parse_canonical_int32, "$numberDouble": _parse_canonical_double, } _PARSERS_SET = set(_PARSERS) def _encode_binary(data: bytes, subtype: int, json_options: JSONOptions) -> Any: if json_options.json_mode == JSONMode.LEGACY: return {"$binary": base64.b64encode(data).decode(), "$type": "%02x" % subtype} return {"$binary": {"base64": base64.b64encode(data).decode(), "subType": "%02x" % subtype}} def _encode_datetimems(obj: Any, json_options: JSONOptions) -> dict: if ( json_options.datetime_representation == DatetimeRepresentation.ISO8601 and 0 <= int(obj) <= _max_datetime_ms() ): return _encode_datetime(obj.as_datetime(), json_options) elif json_options.datetime_representation == DatetimeRepresentation.LEGACY: return {"$date": str(int(obj))} return {"$date": {"$numberLong": str(int(obj))}} def _encode_code(obj: Code, json_options: JSONOptions) -> dict: if obj.scope is None: return {"$code": str(obj)} else: return {"$code": str(obj), "$scope": _json_convert(obj.scope, json_options)} def _encode_int64(obj: Int64, json_options: JSONOptions) -> Any: if json_options.strict_number_long: return {"$numberLong": str(obj)} else: return int(obj) def _encode_noop(obj: Any, dummy0: Any) -> Any: return obj def _encode_regex(obj: Any, json_options: JSONOptions) -> dict: flags = "" if obj.flags & re.IGNORECASE: flags += "i" if obj.flags & re.LOCALE: flags += "l" if obj.flags & re.MULTILINE: flags += "m" if obj.flags & re.DOTALL: flags += "s" if obj.flags & re.UNICODE: flags += "u" if obj.flags & re.VERBOSE: flags += "x" if isinstance(obj.pattern, str): pattern = obj.pattern else: pattern = obj.pattern.decode("utf-8") if json_options.json_mode == JSONMode.LEGACY: return {"$regex": pattern, "$options": flags} return {"$regularExpression": {"pattern": pattern, "options": flags}} def _encode_int(obj: int, json_options: JSONOptions) -> Any: if json_options.json_mode == JSONMode.CANONICAL: if -_INT32_MAX <= obj < _INT32_MAX: return {"$numberInt": str(obj)} return {"$numberLong": str(obj)} return obj def _encode_float(obj: float, json_options: JSONOptions) -> Any: if json_options.json_mode != JSONMode.LEGACY: if math.isnan(obj): return {"$numberDouble": "NaN"} elif math.isinf(obj): representation = "Infinity" if obj > 0 else "-Infinity" return {"$numberDouble": representation} elif json_options.json_mode == JSONMode.CANONICAL: # repr() will return the shortest string guaranteed to produce the # original value, when float() is called on it. return {"$numberDouble": str(repr(obj))} return obj def _encode_datetime(obj: datetime.datetime, json_options: JSONOptions) -> dict: if json_options.datetime_representation == DatetimeRepresentation.ISO8601: if not obj.tzinfo: obj = obj.replace(tzinfo=utc) assert obj.tzinfo is not None if obj >= EPOCH_AWARE: off = obj.tzinfo.utcoffset(obj) if (off.days, off.seconds, off.microseconds) == (0, 0, 0): # type: ignore tz_string = "Z" else: tz_string = obj.strftime("%z") millis = int(obj.microsecond / 1000) fracsecs = ".%03d" % (millis,) if millis else "" return { "$date": "{}{}{}".format(obj.strftime("%Y-%m-%dT%H:%M:%S"), fracsecs, tz_string) } millis = _datetime_to_millis(obj) if json_options.datetime_representation == DatetimeRepresentation.LEGACY: return {"$date": millis} return {"$date": {"$numberLong": str(millis)}} def _encode_bytes(obj: bytes, json_options: JSONOptions) -> dict: return _encode_binary(obj, 0, json_options) def _encode_binary_obj(obj: Binary, json_options: JSONOptions) -> dict: return _encode_binary(obj, obj.subtype, json_options) def _encode_uuid(obj: uuid.UUID, json_options: JSONOptions) -> dict: if json_options.strict_uuid: binval = Binary.from_uuid(obj, uuid_representation=json_options.uuid_representation) return _encode_binary(binval, binval.subtype, json_options) else: return {"$uuid": obj.hex} def _encode_objectid(obj: ObjectId, dummy0: Any) -> dict: return {"$oid": str(obj)} def _encode_timestamp(obj: Timestamp, dummy0: Any) -> dict: return {"$timestamp": {"t": obj.time, "i": obj.inc}} def _encode_decimal128(obj: Timestamp, dummy0: Any) -> dict: return {"$numberDecimal": str(obj)} def _encode_dbref(obj: DBRef, json_options: JSONOptions) -> dict: return _json_convert(obj.as_doc(), json_options=json_options) def _encode_minkey(dummy0: Any, dummy1: Any) -> dict: return {"$minKey": 1} def _encode_maxkey(dummy0: Any, dummy1: Any) -> dict: return {"$maxKey": 1} # Encoders for BSON types # Each encoder function's signature is: # - obj: a Python data type, e.g. a Python int for _encode_int # - json_options: a JSONOptions _ENCODERS: dict[Type, Callable[[Any, JSONOptions], Any]] = { bool: _encode_noop, bytes: _encode_bytes, datetime.datetime: _encode_datetime, DatetimeMS: _encode_datetimems, float: _encode_float, int: _encode_int, str: _encode_noop, type(None): _encode_noop, uuid.UUID: _encode_uuid, Binary: _encode_binary_obj, Int64: _encode_int64, Code: _encode_code, DBRef: _encode_dbref, MaxKey: _encode_maxkey, MinKey: _encode_minkey, ObjectId: _encode_objectid, Regex: _encode_regex, RE_TYPE: _encode_regex, Timestamp: _encode_timestamp, Decimal128: _encode_decimal128, } # Map each _type_marker to its encoder for faster lookup. _MARKERS: dict[int, Callable[[Any, JSONOptions], Any]] = {} for _typ in _ENCODERS: if hasattr(_typ, "_type_marker"): _MARKERS[_typ._type_marker] = _ENCODERS[_typ] _BUILT_IN_TYPES = tuple(t for t in _ENCODERS) def default(obj: Any, json_options: JSONOptions = DEFAULT_JSON_OPTIONS) -> Any: # First see if the type is already cached. KeyError will only ever # happen once per subtype. try: return _ENCODERS[type(obj)](obj, json_options) except KeyError: pass # Second, fall back to trying _type_marker. This has to be done # before the loop below since users could subclass one of our # custom types that subclasses a python built-in (e.g. Binary) if hasattr(obj, "_type_marker"): marker = obj._type_marker if marker in _MARKERS: func = _MARKERS[marker] # Cache this type for faster subsequent lookup. _ENCODERS[type(obj)] = func return func(obj, json_options) # Third, test each base type. This will only happen once for # a subtype of a supported base type. for base in _BUILT_IN_TYPES: if isinstance(obj, base): func = _ENCODERS[base] # Cache this type for faster subsequent lookup. _ENCODERS[type(obj)] = func return func(obj, json_options) raise TypeError("%r is not JSON serializable" % obj) def _get_str_size(obj: Any) -> int: return len(obj) def _get_datetime_size(obj: datetime.datetime) -> int: return 5 + len(str(obj.time())) def _get_regex_size(obj: Regex) -> int: return 18 + len(obj.pattern) def _get_dbref_size(obj: DBRef) -> int: return 34 + len(obj.collection) _CONSTANT_SIZE_TABLE: dict[Any, int] = { ObjectId: 28, int: 11, Int64: 11, Decimal128: 11, Timestamp: 14, MinKey: 8, MaxKey: 8, } _VARIABLE_SIZE_TABLE: dict[Any, Callable[[Any], int]] = { str: _get_str_size, bytes: _get_str_size, datetime.datetime: _get_datetime_size, Regex: _get_regex_size, DBRef: _get_dbref_size, } def get_size(obj: Any, max_size: int, current_size: int = 0) -> int: """Recursively finds size of objects""" if current_size >= max_size: return current_size obj_type = type(obj) # Check to see if the obj has a constant size estimate try: return _CONSTANT_SIZE_TABLE[obj_type] except KeyError: pass # Check to see if the obj has a variable but simple size estimate try: return _VARIABLE_SIZE_TABLE[obj_type](obj) except KeyError: pass # Special cases that require recursion if obj_type == Code: if obj.scope: current_size += ( 5 + get_size(obj.scope, max_size, current_size) + len(obj) - len(obj.scope) ) else: current_size += 5 + len(obj) elif obj_type == dict: for k, v in obj.items(): current_size += get_size(k, max_size, current_size) current_size += get_size(v, max_size, current_size) if current_size >= max_size: return current_size elif hasattr(obj, "__iter__"): for i in obj: current_size += get_size(i, max_size, current_size) if current_size >= max_size: return current_size return current_size def _truncate_documents(obj: Any, max_length: int) -> Tuple[Any, int]: """Recursively truncate documents as needed to fit inside max_length characters.""" if max_length <= 0: return None, 0 remaining = max_length if hasattr(obj, "items"): truncated: Any = {} for k, v in obj.items(): truncated_v, remaining = _truncate_documents(v, remaining) if truncated_v: truncated[k] = truncated_v if remaining <= 0: break return truncated, remaining elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes)): truncated: Any = [] # type:ignore[no-redef] for v in obj: truncated_v, remaining = _truncate_documents(v, remaining) if truncated_v: truncated.append(truncated_v) if remaining <= 0: break return truncated, remaining else: return _truncate(obj, remaining) def _truncate(obj: Any, remaining: int) -> Tuple[Any, int]: size = get_size(obj, remaining) if size <= remaining: return obj, remaining - size else: try: truncated = obj[:remaining] except TypeError: truncated = obj return truncated, remaining - size