import os import token import requests import simplejson as json from pydantic import ( PrivateAttr, BaseModel, SecretStr, HttpUrl ) endpoints = { "alpha": { "auth": "https://oo0wks9pbi.execute-api.us-east-1.amazonaws.com/alpha/", "content": "https://w1yygdhayc.execute-api.us-east-1.amazonaws.com/alpha/", } } class APIClient(BaseModel): # Internal State (Not passed in __init__) ## PrivateAttr to separate state from config and avoid Pylint errors debug: bool = False environment: str = "alpha" client_id: SecretStr client_secret: SecretStr _access_token: str = PrivateAttr(default="") _access_token_timeout: int = PrivateAttr(default=0) def make_request(self, method: str, endpoint: str, path: str, data: dict = None, authenticate: bool = True) -> dict: if endpoint not in endpoints[self.environment]: raise ValueError(f"Endpoint '{endpoint}' is not defined in the API client.") if method.upper() not in ["GET", "POST", "PUT", "DELETE"]: raise ValueError(f"HTTP method '{method}' is not supported.") method = method.lower() url = f"{endpoints[self.environment][endpoint]}{path}" headers = { "Content-Type": "application/json", } if authenticate: token = self._access_token headers["Authorization"] = f"Bearer {token}" if self.debug: print({ "method": method.upper(), "url": url, "headers": headers, "payload": data }) try: response = requests.request(method.upper(), url, json=data, headers=headers, timeout=20) return_json = response.json() except requests.exceptions.RequestException as e: raise ConnectionError(f"An error occurred while making the request: {e}") except json.JSONDecodeError as e: raise ValueError(f"Response is not valid JSON: {e}") return return_json def sign_in(self,) -> None: """ Authenticates with the API to obtain an access token. Sends POST request to OAuth2 token endpoint with client_id and client_secret to retrieve access token and its expiration time. """ # Endpoint for API requests access token for ~3 months payload = { # Data package for endpoint to get the access token "client_id": self.client_id.get_secret_value(), "client_secret": self.client_secret.get_secret_value(), "grant_type": "client_credentials" } token_data = self.make_request("POST", "auth", "token", payload, authenticate=False) if 'tokenData' not in token_data: raise ValueError(f"Unexpected response structure: {token_data}") if self.debug: print(f"Token data received: {token_data}") # Debug statement to check token data structure # Access token and its timeout timestamp. self._access_token = token_data["tokenData"]["accessToken"] self._access_token_timeout = token_data["tokenData"]["accessTimeOut"] return { "access_token": self._access_token, "access_token_timeout": self._access_token_timeout } def set_token(self, token_data: dict) -> None: """ Manually set the access token and its timeout. Args: token_data (dict): A dictionary containing the access token and its expiration time. """ self._access_token = token_data.get("access_token", "") self._access_token_timeout = token_data.get("access_token_timeout", 0) def is_token_valid(self) -> bool: """ Checks if the current access token is still valid based on the current time and the token's expiration time. Returns: bool: True if the token is valid, False otherwise. """ import time current_time = int(time.time()) return self._access_token and current_time < self._access_token_timeout def run_job(self, job_function, *args, **kwargs): """ Utility method to run a job function with the API client as the first argument. Args: job_function (string): The job function to execute. *args: Positional arguments to pass to the job function. **kwargs: Keyword arguments to pass to the job function. Returns: The result of the job function execution. """ if isinstance(job_function, str): # Dynamically import the job function from the jobs module module_name, func_name = job_function.rsplit('.', 1) module = __import__(f"jobs.{module_name}", fromlist=[func_name]) job_func = getattr(module, func_name) else: job_func = job_function return job_func(self, *args, **kwargs)