trustcafe-api-wrapper/src/trustcafeapiwrapper/apiclient.py
2026-04-07 22:32:13 +01:00

190 lines
7.8 KiB
Python

import requests
import simplejson as json
from urllib.parse import urlencode
from pydantic import (
PrivateAttr, BaseModel, SecretStr, HttpUrl
)
endpoints = {
"alpha": {
"audrey": "https://eso1of8gqd.execute-api.us-east-1.amazonaws.com/alpha/",
"auth": "https://oo0wks9pbi.execute-api.us-east-1.amazonaws.com/alpha/",
"content": "https://w1yygdhayc.execute-api.us-east-1.amazonaws.com/alpha/",
"megaphone": "https://opdhjaktnl.execute-api.us-east-1.amazonaws.com/alpha/",
"moderation": "https://sbb1xrqsf1.execute-api.us-east-1.amazonaws.com/alpha/",
"spider": "https://xuvz1oj1sk.execute-api.us-east-1.amazonaws.com/dev/"
},
"production": {
"audrey": "https://iiouau5d2k.execute-api.us-east-1.amazonaws.com/production/",
"auth": "https://e2we3nktl4.execute-api.us-east-1.amazonaws.com/production/",
"content": "https://32hho6rvg1.execute-api.us-east-1.amazonaws.com/production/",
"megaphone": "https://yfnamdpnc8.execute-api.us-east-1.amazonaws.com/production/",
"moderation": "https://egfpoo3mw3.execute-api.us-east-1.amazonaws.com/production/",
"spider": "https://coi5kvgypc.execute-api.us-east-1.amazonaws.com/production/"
},
}
class APIClient(BaseModel):
# Internal State (Not passed in __init__)
## PrivateAttr to separate state from config and avoid Pylint errors
debug: bool = False
environment: str = "alpha"
client_id: SecretStr
client_secret: SecretStr
_access_token: str = PrivateAttr(default="")
_access_token_timeout: int = PrivateAttr(default=0)
def __init__(self, **data):
super().__init__(**data)
self.validate_environment()
def validate_environment(self):
if self.environment not in endpoints:
raise ValueError(f"Environment '{self.environment}' is not valid. Must be one of: {list(endpoints.keys())}")
def make_request(self, method: str, endpoint: str, path: str, data: dict = None, authenticate: bool = True, query_params: dict = None) -> dict:
# Make sure the endpoint is defined in the endpoints dictionary
if endpoint not in endpoints[self.environment]:
raise ValueError(f"Endpoint '{endpoint}' is not defined in the API client.")
# Make sure the HTTP method is valid
if method.upper() not in ["GET", "POST", "PUT", "DELETE"]:
raise ValueError(f"HTTP method '{method}' is not supported.")
method = method.lower()
# Construct the full URL for the API request
url = f"{endpoints[self.environment][endpoint]}{path}"
# Append query parameters to the URL if provided
if query_params is not None and isinstance(query_params, dict):
url += f"?{urlencode(query_params)}"
# Set up headers for the request
headers = {
"Content-Type": "application/json",
}
# Add the Authorization header with the access token if authentication is required
if authenticate:
token = self._access_token
headers["Authorization"] = f"Bearer {token}"
# Debugging output to show the request details before making the API call
if self.debug:
print({
"method": method.upper(),
"url": url,
"headers": headers,
"payload": data
})
# Make the API request and handle potential exceptions
try:
response = requests.request(method.upper(), url, json=data, headers=headers, timeout=20)
return_json = response.json()
except requests.exceptions.RequestException as e:
raise ConnectionError(f"An error occurred while making the request: {e}")
except json.JSONDecodeError as e:
raise ValueError(f"Response is not valid JSON: {e}")
return return_json
def sign_in(self,) -> dict:
"""
Authenticates with the API to obtain an access token.
Sends POST request to OAuth2 token endpoint with client_id and
client_secret to retrieve access token and its expiration time.
"""
# Endpoint for API requests access token for ~3 months
payload = { # Data package for endpoint to get the access token
"client_id": self.client_id.get_secret_value(),
"client_secret": self.client_secret.get_secret_value(),
"grant_type": "client_credentials"
}
token_data = self.make_request("POST", "auth", "token", payload, authenticate=False)
if 'tokenData' not in token_data:
raise ValueError(f"Unexpected response structure: {token_data}")
if self.debug:
print(f"Token data received: {token_data}") # Debug statement to check token data structure
# Access token and its timeout timestamp.
self._access_token = token_data["tokenData"]["accessToken"]
self._access_token_timeout = token_data["tokenData"]["accessTimeOut"]
return {
"access_token": self._access_token,
"access_token_timeout": self._access_token_timeout
}
def set_token(self, token_data: dict) -> None:
"""
Manually set the access token and its timeout.
Args:
token_data (dict): A dictionary containing the access token and its expiration time.
"""
self._access_token = token_data.get("access_token", "")
self._access_token_timeout = token_data.get("access_token_timeout", 0)
def set_environment(self, environment: str) -> None:
"""
Set the environment for the API client.
Args:
environment (str): The environment to set (e.g., "alpha", "production").
"""
self.validate_environment()
self.environment = environment
def is_token_valid(self) -> bool:
"""
Checks if the current access token is still valid based on the current time and the token's expiration time.
Returns:
bool: True if the token is valid, False otherwise.
"""
import time
current_time = int(time.time())
return self._access_token and current_time < self._access_token_timeout
def run_job(self, job_function, *args, **kwargs):
"""
Utility method to run a job function with the API client as the first argument.
Args:
job_function (string): The job function to execute.
*args: Positional arguments to pass to the job function.
**kwargs: Keyword arguments to pass to the job function.
Returns:
The result of the job function execution.
"""
if isinstance(job_function, str):
# Dynamically import the job function from the jobs module
module_name, func_name = job_function.rsplit('.', 1)
module = __import__(f"trustcafeapiwrapper.jobs.{module_name}", fromlist=[func_name])
job_func = getattr(module, func_name)
else:
job_func = job_function
return job_func(self, *args, **kwargs)
def wrapped(self, wrapped_data):
"""
Utility method to run a job function based on a wrapped data dictionary
containing 'job' and 'payload' keys as expected by the API client wrapper functions.
Args:
wrapped_data (dict): A dictionary with 'job' (string) and 'payload' (dict) keys.
Returns:
The result of the job function execution.
"""
return self.run_job(wrapped_data.get("job_function"), wrapped_data.get("payload", {}))