# Copyright 2024-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """GCP helpers.""" from __future__ import annotations from typing import Any def _get_gcp_response(resource: str, timeout: float = 5) -> dict[str, Any]: from urllib.request import Request, urlopen url = "http://metadata/computeMetadata/v1/instance/service-accounts/default/identity" url += f"?audience={resource}" headers = {"Metadata-Flavor": "Google"} request = Request(url, headers=headers) # noqa: S310 try: with urlopen(request, timeout=timeout) as response: # noqa: S310 status = response.status body = response.read().decode("utf8") except Exception as e: msg = "Failed to acquire IMDS access token: %s" % e raise ValueError(msg) from None if status != 200: msg = "Failed to acquire IMDS access token." raise ValueError(msg) return dict(access_token=body)