Working example
This commit is contained in:
parent
3b8e1e558b
commit
070148cabe
3 changed files with 75 additions and 10 deletions
5
.gitignore
vendored
5
.gitignore
vendored
|
|
@ -18,4 +18,7 @@ __pycache__/
|
|||
**/__pycache__/
|
||||
|
||||
|
||||
.vscode/*
|
||||
.vscode/*
|
||||
|
||||
# Test Artifacts
|
||||
token_data.json
|
||||
53
apiclient.py
53
apiclient.py
|
|
@ -8,7 +8,7 @@ from pydantic import (
|
|||
endpoints = {
|
||||
"alpha": {
|
||||
"auth": "https://oo0wks9pbi.execute-api.us-east-1.amazonaws.com/alpha/",
|
||||
"content": "https://oo0wks9pbi.execute-api.us-east-1.amazonaws.com/alpha/",
|
||||
"content": "https://w1yygdhayc.execute-api.us-east-1.amazonaws.com/alpha/",
|
||||
}
|
||||
}
|
||||
class APIClient(BaseModel):
|
||||
|
|
@ -33,8 +33,6 @@ class APIClient(BaseModel):
|
|||
raise ValueError(f"HTTP method '{method}' is not supported.")
|
||||
|
||||
method = method.lower()
|
||||
func = getattr(requests, method)
|
||||
|
||||
url = f"{endpoints[self.environment][endpoint]}{path}"
|
||||
|
||||
headers = {
|
||||
|
|
@ -51,7 +49,7 @@ class APIClient(BaseModel):
|
|||
"payload": data
|
||||
})
|
||||
try:
|
||||
response = requests.request(method, url, json=data, headers=headers, timeout=20)
|
||||
response = requests.request(method.upper(), url, json=data, headers=headers, timeout=20)
|
||||
return_json = response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise ConnectionError(f"An error occurred while making the request: {e}")
|
||||
|
|
@ -85,3 +83,50 @@ class APIClient(BaseModel):
|
|||
if self.debug:
|
||||
print(f"Debug: Current access token: {self._access_token}")
|
||||
|
||||
return {
|
||||
"access_token": self._access_token,
|
||||
"access_token_timeout": self._access_token_timeout
|
||||
}
|
||||
|
||||
def set_token(self, token_data: dict) -> None:
|
||||
"""
|
||||
Manually set the access token and its timeout.
|
||||
|
||||
Args:
|
||||
token_data (dict): A dictionary containing the access token and its expiration time.
|
||||
"""
|
||||
self._access_token = token_data.get("access_token", "")
|
||||
self._access_token_timeout = token_data.get("access_token_timeout", 0)
|
||||
|
||||
def is_token_valid(self) -> bool:
|
||||
"""
|
||||
Checks if the current access token is still valid based on the current time and the token's expiration time.
|
||||
|
||||
Returns:
|
||||
bool: True if the token is valid, False otherwise.
|
||||
"""
|
||||
import time
|
||||
current_time = int(time.time())
|
||||
return self._access_token and current_time < self._access_token_timeout
|
||||
|
||||
def run_job(self, job_function, *args, **kwargs):
|
||||
"""
|
||||
Utility method to run a job function with the API client as the first argument.
|
||||
|
||||
Args:
|
||||
job_function (string): The job function to execute.
|
||||
*args: Positional arguments to pass to the job function.
|
||||
**kwargs: Keyword arguments to pass to the job function.
|
||||
|
||||
Returns:
|
||||
The result of the job function execution.
|
||||
"""
|
||||
if isinstance(job_function, str):
|
||||
# Dynamically import the job function from the jobs module
|
||||
module_name, func_name = job_function.rsplit('.', 1)
|
||||
module = __import__(f"jobs.{module_name}", fromlist=[func_name])
|
||||
job_func = getattr(module, func_name)
|
||||
else:
|
||||
job_func = job_function
|
||||
|
||||
return job_func(self, *args, **kwargs)
|
||||
27
testing.py
27
testing.py
|
|
@ -1,16 +1,33 @@
|
|||
"""
|
||||
Demonstrate basic usage
|
||||
"""
|
||||
from apiclient import APIClient
|
||||
import jobs
|
||||
import os
|
||||
import os, simplejson as json
|
||||
|
||||
# Handle environment variables
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
# Initialize API client with credentials from environment variables
|
||||
API = APIClient(
|
||||
client_id=os.getenv("client_id"),
|
||||
client_secret=os.getenv("client_secret"),
|
||||
debug=True
|
||||
)
|
||||
API.sign_in()
|
||||
# profile = requests.userprofile
|
||||
profile = jobs.userprofile.get(API, "simon-little")
|
||||
|
||||
# Keep a token cache to avoid unnecessary sign-ins during development.
|
||||
# (In production, you'd handle this more robustly and securely)
|
||||
|
||||
if os.path.exists("token_data.json"):
|
||||
with open("token_data.json", "r") as f:
|
||||
token_data = json.load(f)
|
||||
API.set_token(token_data)
|
||||
|
||||
# Get a new one if we don't have one or if the existing one is expired
|
||||
if not API.is_token_valid():
|
||||
tokendata = API.sign_in()
|
||||
with open("token_data.json", "w") as f:
|
||||
json.dump(tokendata, f, indent=4)
|
||||
|
||||
profile = API.run_job('userprofile.get', "simon-little")
|
||||
print(profile)
|
||||
Loading…
Add table
Add a link
Reference in a new issue