30 lines
No EOL
1.3 KiB
Python
30 lines
No EOL
1.3 KiB
Python
async def grab_latest_posts(self) -> bool:
|
|
"""
|
|
Fetches the latest posts from the API.
|
|
|
|
If it's the first run (_time_last_scan is 0), checks all posts.
|
|
Otherwise, checks only posts created after the last scan time.
|
|
Looks in the foryou feed.
|
|
|
|
Returns:
|
|
bool: True if posts are successfully fetched and processed.
|
|
"""
|
|
# Grabbing latest posts...
|
|
print("\nBig Brother is watching you...")
|
|
headers = {"Authorization": f"Bearer {self._access_token}"}
|
|
url_normal = f"{str(self.api_url_content)}/post/foryou"
|
|
# url_removed = f"{str(self.api_url_content)}/post/removed"
|
|
try:
|
|
# NOTE: Maybe define self.session = requests.Session() in an __init__ or similar and use it here instead of the 'with' block for better performance.
|
|
# Session for connection pooling and performance improvement
|
|
with requests.Session() as session:
|
|
# Update session headers once instead of every request
|
|
session.headers.update(headers)
|
|
|
|
# GET requests to fetch normal and removed posts
|
|
res_n = session.get(url_normal, timeout=60)
|
|
# res_r = session.get(url_removed, timeout=60)
|
|
|
|
# Checking if both requests are good before proceeding
|
|
res_n.raise_for_status()
|
|
# res_r.raise_for_status() |