Return token data from handle_token

Notes for dev
This commit is contained in:
simonwt 2026-04-16 21:08:54 +01:00
parent 89fc8706cb
commit cd1f0a2ae3
5 changed files with 55 additions and 6 deletions

View file

@ -26,9 +26,24 @@ potentially misleading.
2. Make more wrappers 2. Make more wrappers
3. Write more documentation 3. Write more documentation
4. Wrappers should also accept actual keys for when you know them 4. Wrappers should also accept actual keys for when you know them
5. Add utility to manage the token easily to encourage reuse over 5. Use this:
new tokens every time
```python
try:
# NOTE: Maybe define self.session = requests.Session() in an __init__ or similar and use it here instead of the 'with' block for better performance.
# Session for connection pooling and performance improvement
with requests.Session() as session:
# Update session headers once instead of every request
session.headers.update(headers)
# GET requests to fetch normal and removed posts
res_n = session.get(url_normal, timeout=60)
# res_r = session.get(url_removed, timeout=60)
# Checking if both requests are good before proceeding
res_n.raise_for_status()
# res_r.raise_for_status()
```
Trust Trust

View file

@ -1,6 +1,6 @@
[project] [project]
name = "trustcafeapiwrapper" name = "trustcafeapiwrapper"
version = "0.1.0.11" version = "0.1.0.12"
description = "Wraps the Trust Cafe API" description = "Wraps the Trust Cafe API"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"

View file

@ -170,9 +170,11 @@ class APIClient(BaseModel):
# Get a new one if we don't have one or if the existing one is expired # Get a new one if we don't have one or if the existing one is expired
if not self.is_token_valid(): if not self.is_token_valid():
tokendata = self.sign_in() token_data = self.sign_in()
with open(token_data_path, "w") as f: with open(token_data_path, "w") as f:
json.dump(tokendata, f, indent=2) json.dump(token_data, f, indent=2)
return token_data
def run_job(self, job_function, *args, **kwargs): def run_job(self, job_function, *args, **kwargs):
""" """

View file

@ -26,7 +26,9 @@ API = APIClient(
# Keep a token cache to avoid unnecessary sign-ins during development. # Keep a token cache to avoid unnecessary sign-ins during development.
# (In production, you'd handle this more robustly and securely) # (In production, you'd handle this more robustly and securely)
API.handle_token() # This will load the token from file if it exists and is valid, or sign in to get a new one if not. token_data = API.handle_token() # This will load the token from file if it exists and is valid, or sign in to get a new one if not.
if token_data is None or 'access_token' not in token_data:
raise Exception("Failed to obtain a valid access token. Please check your credentials and token handling.")
''' '''
END IMPORTANT BIT END IMPORTANT BIT

30
usethis.py Normal file
View file

@ -0,0 +1,30 @@
async def grab_latest_posts(self) -> bool:
"""
Fetches the latest posts from the API.
If it's the first run (_time_last_scan is 0), checks all posts.
Otherwise, checks only posts created after the last scan time.
Looks in the foryou feed.
Returns:
bool: True if posts are successfully fetched and processed.
"""
# Grabbing latest posts...
print("\nBig Brother is watching you...")
headers = {"Authorization": f"Bearer {self._access_token}"}
url_normal = f"{str(self.api_url_content)}/post/foryou"
# url_removed = f"{str(self.api_url_content)}/post/removed"
try:
# NOTE: Maybe define self.session = requests.Session() in an __init__ or similar and use it here instead of the 'with' block for better performance.
# Session for connection pooling and performance improvement
with requests.Session() as session:
# Update session headers once instead of every request
session.headers.update(headers)
# GET requests to fetch normal and removed posts
res_n = session.get(url_normal, timeout=60)
# res_r = session.get(url_removed, timeout=60)
# Checking if both requests are good before proceeding
res_n.raise_for_status()
# res_r.raise_for_status()