164 lines
6.4 KiB
Python
164 lines
6.4 KiB
Python
import os
|
|
import token
|
|
import requests
|
|
import simplejson as json
|
|
from urllib.parse import urlencode
|
|
|
|
from pydantic import (
|
|
PrivateAttr, BaseModel, SecretStr, HttpUrl
|
|
)
|
|
endpoints = {
|
|
"alpha": {
|
|
"audrey": "https://eso1of8gqd.execute-api.us-east-1.amazonaws.com/alpha/",
|
|
"auth": "https://oo0wks9pbi.execute-api.us-east-1.amazonaws.com/alpha/",
|
|
"content": "https://w1yygdhayc.execute-api.us-east-1.amazonaws.com/alpha/",
|
|
"megaphone": "https://opdhjaktnl.execute-api.us-east-1.amazonaws.com/alpha/",
|
|
}
|
|
}
|
|
class APIClient(BaseModel):
|
|
# Internal State (Not passed in __init__)
|
|
## PrivateAttr to separate state from config and avoid Pylint errors
|
|
debug: bool = False
|
|
environment: str = "alpha"
|
|
|
|
client_id: SecretStr
|
|
client_secret: SecretStr
|
|
|
|
|
|
_access_token: str = PrivateAttr(default="")
|
|
_access_token_timeout: int = PrivateAttr(default=0)
|
|
|
|
|
|
def make_request(self, method: str, endpoint: str, path: str, data: dict = None, authenticate: bool = True, query_params: dict = None) -> dict:
|
|
|
|
# Make sure the endpoint is defined in the endpoints dictionary
|
|
if endpoint not in endpoints[self.environment]:
|
|
raise ValueError(f"Endpoint '{endpoint}' is not defined in the API client.")
|
|
|
|
# Make sure the HTTP method is valid
|
|
if method.upper() not in ["GET", "POST", "PUT", "DELETE"]:
|
|
raise ValueError(f"HTTP method '{method}' is not supported.")
|
|
method = method.lower()
|
|
|
|
# Construct the full URL for the API request
|
|
url = f"{endpoints[self.environment][endpoint]}{path}"
|
|
# Append query parameters to the URL if provided
|
|
if query_params is not None and isinstance(query_params, dict):
|
|
url += f"?{urlencode(query_params)}"
|
|
|
|
# Set up headers for the request
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
}
|
|
# Add the Authorization header with the access token if authentication is required
|
|
if authenticate:
|
|
token = self._access_token
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
# Debugging output to show the request details before making the API call
|
|
if self.debug:
|
|
print({
|
|
"method": method.upper(),
|
|
"url": url,
|
|
"headers": headers,
|
|
"payload": data
|
|
})
|
|
|
|
# Make the API request and handle potential exceptions
|
|
try:
|
|
response = requests.request(method.upper(), url, json=data, headers=headers, timeout=20)
|
|
return_json = response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
raise ConnectionError(f"An error occurred while making the request: {e}")
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Response is not valid JSON: {e}")
|
|
|
|
return return_json
|
|
|
|
def sign_in(self,) -> None:
|
|
"""
|
|
Authenticates with the API to obtain an access token.
|
|
|
|
Sends POST request to OAuth2 token endpoint with client_id and
|
|
client_secret to retrieve access token and its expiration time.
|
|
"""
|
|
# Endpoint for API requests access token for ~3 months
|
|
payload = { # Data package for endpoint to get the access token
|
|
"client_id": self.client_id.get_secret_value(),
|
|
"client_secret": self.client_secret.get_secret_value(),
|
|
"grant_type": "client_credentials"
|
|
}
|
|
token_data = self.make_request("POST", "auth", "token", payload, authenticate=False)
|
|
if 'tokenData' not in token_data:
|
|
raise ValueError(f"Unexpected response structure: {token_data}")
|
|
|
|
if self.debug:
|
|
print(f"Token data received: {token_data}") # Debug statement to check token data structure
|
|
|
|
# Access token and its timeout timestamp.
|
|
self._access_token = token_data["tokenData"]["accessToken"]
|
|
self._access_token_timeout = token_data["tokenData"]["accessTimeOut"]
|
|
|
|
return {
|
|
"access_token": self._access_token,
|
|
"access_token_timeout": self._access_token_timeout
|
|
}
|
|
|
|
def set_token(self, token_data: dict) -> None:
|
|
"""
|
|
Manually set the access token and its timeout.
|
|
|
|
Args:
|
|
token_data (dict): A dictionary containing the access token and its expiration time.
|
|
"""
|
|
self._access_token = token_data.get("access_token", "")
|
|
self._access_token_timeout = token_data.get("access_token_timeout", 0)
|
|
|
|
def is_token_valid(self) -> bool:
|
|
"""
|
|
Checks if the current access token is still valid based on the current time and the token's expiration time.
|
|
|
|
Returns:
|
|
bool: True if the token is valid, False otherwise.
|
|
"""
|
|
import time
|
|
current_time = int(time.time())
|
|
return self._access_token and current_time < self._access_token_timeout
|
|
|
|
def run_job(self, job_function, *args, **kwargs):
|
|
"""
|
|
Utility method to run a job function with the API client as the first argument.
|
|
|
|
Args:
|
|
job_function (string): The job function to execute.
|
|
*args: Positional arguments to pass to the job function.
|
|
**kwargs: Keyword arguments to pass to the job function.
|
|
|
|
Returns:
|
|
The result of the job function execution.
|
|
"""
|
|
if isinstance(job_function, str):
|
|
# Dynamically import the job function from the jobs module
|
|
module_name, func_name = job_function.rsplit('.', 1)
|
|
module = __import__(f"jobs.{module_name}", fromlist=[func_name])
|
|
job_func = getattr(module, func_name)
|
|
else:
|
|
job_func = job_function
|
|
|
|
return job_func(self, *args, **kwargs)
|
|
|
|
def wrapped(self, wrapped_data):
|
|
"""
|
|
Utility method to run a job function based on a wrapped data dictionary
|
|
containing 'job' and 'payload' keys as expected by the API client wrapper functions.
|
|
Args:
|
|
wrapped_data (dict): A dictionary with 'job' (string) and 'payload' (dict) keys.
|
|
Returns:
|
|
The result of the job function execution.
|
|
|
|
|
|
"""
|
|
|
|
return self.run_job(wrapped_data.get("job_function"), wrapped_data.get("payload", {}))
|
|
|
|
|