1
0
Fork 0
mirror of https://codeberg.org/Reuh/feather.git synced 2025-10-27 18:19:32 +00:00
feather/main.py

240 lines
9.8 KiB
Python

import os
import re
import json
import google_reader
import tomllib
import sys
from datetime import datetime
from zoneinfo import ZoneInfo
from pathlib import Path
from hashlib import sha256
from jinja2 import Template
#%% Configuration
class Config:
def __init__(self):
with open("config.default.toml", "rb") as f:
default_config = tomllib.load(f)
config_path = os.environ.get("CONFIG_PATH") or "config.toml"
with open(config_path, "rb") as f:
config = tomllib.load(f)
def get_config(category, field, can_default=True):
env_name = f"{category.upper()}_{field.upper()}"
c = config.get(category, {})
if env_name in os.environ:
return os.environ[env_name]
elif field in c:
return c[field]
elif can_default:
return default_config[category][field]
else:
print(f"Error while loading configuration: {category}.{field} not found in {config_path} nor in environment variable {env_name}", file=sys.stderr)
exit(1)
# Get config fields
self.html_root: Path = Path(get_config("directories", "reader"))
self.json_root: Path = Path(get_config("directories", "data"))
self.server_url: str = str(get_config("server", "url", False))
self.server_user: str = str(get_config("server", "user", False))
self.server_password: str = str(get_config("server", "password", False))
self.items_per_query: int = int(get_config("server", "items_per_request"))
self.timezone: ZoneInfo = ZoneInfo(get_config("time", "timezone"))
self.time_format: str = str(get_config("time", "format"))
self.item_template: Template = Template(get_config("html", "template"), autoescape=True)
self.item_filename_template: Template = Template(get_config("html", "filename_template"), autoescape=False)
self.max_filename_length: int = int(get_config("html", "max_filename_length"))
# Computed config fields
self.update_lock = self.json_root / "update.lock"
# Create missing directories
self.html_root.mkdir(exist_ok=True)
self.json_root.mkdir(exist_ok=True)
#%% Interaction with server
label_name = re.compile("user/.*/label/(.*)")
class ClientSession:
client: google_reader.Client
auth_token: str
csrf_token: str
def __init__(self, config: Config):
self.client = google_reader.Client(config.server_url)
self.auth_token = self.client.login(config.server_user, config.server_password)
self.csrf_token = self.client.get_token(self.auth_token)
def mark_as_read(self, item_ids):
self.client.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ])
def list_folders(self):
folders = [tag for tag in self.client.list_tags(self.auth_token) if tag.type == "folder"]
l = []
for folder in folders:
folder_name = folder.label or label_name.search(folder.id).group(1)
folder_id = folder.id
l.append((folder_name, folder_id))
return l
def get_stream_items_ids(self, *args, **kwargs):
return self.client.get_stream_items_ids(self.auth_token, *args, **kwargs)
def get_stream_items_contents(self, *args, **kwargs):
return self.client.get_stream_items_contents(self.auth_token, self.csrf_token, *args, **kwargs)
#%% Regular feather operations
def mark_deleted_as_read(config, client_session):
# Mark items that are in the JSON directory but with missing HTML file as read on the server
if config.update_lock.exists():
print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage")
return
marked_as_read = 0
to_mark_as_read = []
for stored_item in config.json_root.glob("*.json"):
item_json = json.load(stored_item.open("r"))
html_path = config.html_root / item_json["html_path"]
if not html_path.exists():
to_mark_as_read.append(item_json["id"])
# delete JSON file
stored_item.unlink()
marked_as_read += 1
for i in range(0, len(to_mark_as_read), config.items_per_query):
client_session.mark_as_read(to_mark_as_read[i:i+500])
print(f"Marked {marked_as_read} items as read")
def escape_filename(filename):
return filename.replace("/", "-")
def truncate_filename(config, filename):
max_filename_length = config.max_filename_length
filename_utf8 = filename.encode("utf-8")
if len(filename_utf8) <= max_filename_length:
return filename
else:
suffix = Path(filename).suffix
max_basename_length = max_filename_length - len(suffix.encode("utf-8"))
cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore"))
return filename[:cutoff] + '' + suffix
def get_html_path(config, item_json):
folder_directory = config.html_root / escape_filename(item_json["folder"])
folder_directory.mkdir(exist_ok=True)
html_name = truncate_filename(config, escape_filename(config.item_filename_template.render(item_json)))
return folder_directory / html_name
def format_datetime(config, timestamp):
return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format)
def synchronize_with_server(config, client_session):
# Synchronize items from the server, generating and deleting JSON and HTML files accordingly
config.update_lock.touch()
print("Synchronizing with server...")
new_items = 0
grabbed_item_paths = []
folders = client_session.list_folders()
for (folder_name, folder_id) in folders:
print(f" Updating folder {folder_name}")
def process(item_ids):
nonlocal new_items, grabbed_item_paths
if len(item_ids) > 0:
item_contents = client_session.get_stream_items_contents(item_ids=item_ids)
for item_content in item_contents.items:
item_json = {
"id": item_content.id,
"folder": folder_name,
"title": item_content.title,
"published": item_content.published,
"published_formatted": format_datetime(config, item_content.published),
"updated": item_content.updated,
"updated_formatted": format_datetime(config, item_content.updated),
"author": item_content.author,
"summary": item_content.summary.content,
"content": item_content.content.content,
"origin_title": item_content.origin.title,
"origin_url": item_content.origin.html_url,
"canonical_url": item_content.canonical[0].href,
}
item_json["html_path"] = str(get_html_path(config, item_json).relative_to(config.html_root))
json_path = config.json_root / f"{ sha256(item_json["id"].encode("utf-8")).hexdigest() }.json"
grabbed_item_paths.append(json_path)
if not json_path.exists():
# write JSON
with json_path.open("w") as f:
json.dump(item_json, f)
# write HTML
generate_html_for_item(config, item_json)
new_items += 1
continuation = None
while continuation != '':
items = client_session.get_stream_items_ids(stream_id=folder_id, exclude_target="user/-/state/com.google/read", limit=config.items_per_query, continuation=continuation)
item_ids = [item.id for item in items.item_refs]
process(item_ids)
continuation = items.continuation
# Remove items that we didn't get from the server but are in the JSON directory
removed_items = 0
for item_path in config.json_root.glob("*.json"):
if not item_path in grabbed_item_paths:
# remove HTML
item_json = json.load(item_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True) # ignore if file was deleted by user during sync
# remove JSON
item_path.unlink()
removed_items += 1
print(f"Synchronization successful ({new_items} new items, {removed_items} removed)")
config.update_lock.unlink()
def generate_html_for_item(config, item_json):
# Write HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if html_path.exists():
print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.")
else:
with html_path.open("w") as f:
f.write(config.item_template.render(item_json))
def remove_html_for_item(config, item_json, ignore_deleted=False):
# Delete a HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if not ignore_deleted or html_path.exists():
html_path.unlink()
def remove_empty_html_directories(config):
# Remove empty directories in the HTML directory
html_root = config.html_root
for (dirpath, dirnames, filenames) in html_root.walk(top_down=False):
if dirpath != html_root:
if len(dirnames) == 0 and len(filenames) == 0:
dirpath.rmdir()
def process(config, client_session):
# Do a full feather update
mark_deleted_as_read(config, client_session)
synchronize_with_server(config, client_session)
remove_empty_html_directories(config)
#%% Run feather
def main():
config = Config()
client_session = ClientSession(config)
process(config, client_session)
if __name__ == "__main__":
main()