#!/usr/bin/python3 import os import re import json import google_reader import tomllib import sys import argparse import asyncio import signal from datetime import datetime from zoneinfo import ZoneInfo from pathlib import Path from hashlib import sha256 from jinja2 import Template #%% Configuration class Config: def __init__(self): with open("config.default.toml", "rb") as f: default_config = tomllib.load(f) config_path = Path(os.environ.get("CONFIG_PATH") or "config.toml") if config_path.exists(): with config_path.open("rb") as f: config = tomllib.load(f) elif "CONFIG_PATH" in os.environ: print(f"Configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path") exit(1) else: config = {} def get_config(category, field, can_default=True): env_name = f"{category.upper()}_{field.upper()}" c = config.get(category, {}) if env_name in os.environ: return os.environ[env_name] elif field in c: return c[field] elif can_default: return default_config[category][field] else: print(f"Error while loading configuration: {category}.{field} not found in {config_path} nor in environment variable {env_name}", file=sys.stderr) exit(1) # Get config fields self.html_root: Path = Path(get_config("directories", "reader")) self.json_root: Path = Path(get_config("directories", "data")) self.server_url: str = str(get_config("server", "url", False)) self.server_user: str = str(get_config("server", "user", False)) self.server_password: str = str(get_config("server", "password", False)) self.items_per_query: int = int(get_config("server", "items_per_request")) self.timezone: ZoneInfo = ZoneInfo(str(get_config("datetime", "timezone"))) self.time_format: str = str(get_config("datetime", "format")) self.item_template: Template = Template(str(get_config("html", "template")), autoescape=True) self.item_filename_template: Template = Template(str(get_config("html", "filename_template")), autoescape=False) self.max_filename_length: int = int(get_config("html", "max_filename_length")) self.filename_translation = str.maketrans(get_config("html", "filename_replacement")) self.daemon_sync_up_every: int = int(get_config("daemon", "sync_up_every")) self.daemon_sync_down_every: int = int(get_config("daemon", "sync_down_every")) # Computed config fields self.update_lock = self.json_root / "update.lock" # Create missing directories self.html_root.mkdir(exist_ok=True) self.json_root.mkdir(exist_ok=True) #%% Interaction with server label_name = re.compile("user/.*/label/(.*)") class ClientSession: client: google_reader.Client auth_token: str csrf_token: str def __init__(self, config: Config): self.client = google_reader.Client(config.server_url) self.auth_token = self.client.login(config.server_user, config.server_password) self.csrf_token = self.client.get_token(self.auth_token) def mark_as_read(self, item_ids): self.client.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ]) def list_folders(self): folders = [tag for tag in self.client.list_tags(self.auth_token) if tag.type == "folder"] l = [] for folder in folders: folder_name = folder.label or label_name.search(folder.id).group(1) folder_id = folder.id l.append((folder_name, folder_id)) return l def get_stream_items_ids(self, *args, **kwargs): return self.client.get_stream_items_ids(self.auth_token, *args, **kwargs) def get_stream_items_contents(self, *args, **kwargs): return self.client.get_stream_items_contents(self.auth_token, self.csrf_token, *args, **kwargs) #%% Regular feather operations def mark_deleted_as_read(config, client_session): # Mark items that are in the JSON directory but with missing HTML file as read on the server if config.update_lock.exists(): print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage") return marked_as_read = 0 to_mark_as_read = [] for json_path in config.json_root.glob("*.json"): item_json = json.load(json_path.open("r")) html_path = config.html_root / item_json["html_path"] if not html_path.exists(): to_mark_as_read.append(item_json["id"]) # delete JSON file json_path.unlink() marked_as_read += 1 for i in range(0, len(to_mark_as_read), config.items_per_query): client_session.mark_as_read(to_mark_as_read[i:i+500]) print(f"Marked {marked_as_read} items as read") def escape_filename(config, filename): return filename.translate(config.filename_translation) def truncate_filename(config, filename): max_filename_length = config.max_filename_length filename_utf8 = filename.encode("utf-8") if len(filename_utf8) <= max_filename_length: return filename else: suffix = Path(filename).suffix max_basename_length = max_filename_length - len(suffix.encode("utf-8")) cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore")) return filename[:cutoff] + '…' + suffix def get_html_path(config, item_json): folder_directory = config.html_root / escape_filename(config, item_json["folder"]) folder_directory.mkdir(exist_ok=True) html_name = truncate_filename(config, escape_filename(config, config.item_filename_template.render(item_json))) return folder_directory / html_name def format_datetime(config, timestamp): return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format) def set_computed_fields_json(config, item_json): item_json["published_formatted"] = format_datetime(config, item_json["published"]) item_json["updated_formatted"] = format_datetime(config, item_json["updated"]) item_json["html_path"] = str(get_html_path(config, item_json).relative_to(config.html_root)) def synchronize_with_server(config, client_session): # Synchronize items from the server, generating and deleting JSON and HTML files accordingly config.update_lock.touch() print("Synchronizing with server...") new_items, updated_items = 0, 0 grabbed_item_paths = [] folders = client_session.list_folders() for (folder_name, folder_id) in folders: print(f" Updating folder {folder_name}") def process(item_ids): nonlocal new_items, updated_items, grabbed_item_paths if len(item_ids) > 0: item_contents = client_session.get_stream_items_contents(item_ids=item_ids) for item_content in item_contents.items: item_json = { "id": item_content.id, "folder": folder_name, "title": item_content.title, "published": item_content.published, "updated": item_content.updated, "author": item_content.author, "summary": item_content.summary.content, "content": item_content.content.content, "origin_title": item_content.origin.title, "origin_url": item_content.origin.html_url, "canonical_url": item_content.canonical[0].href, } set_computed_fields_json(config, item_json) json_path = config.json_root / f"{ sha256(item_json["id"].encode("utf-8")).hexdigest() }.json" grabbed_item_paths.append(json_path) write_files, updating = False, False if not json_path.exists(): write_files = True new_items += 1 else: old_item_json = json.load(json_path.open("r")) if item_json["updated"] > old_item_json["updated"]: write_files, updating = True, True updated_items += 1 if write_files: # write JSON with json_path.open("w") as f: json.dump(item_json, f) # write HTML generate_html_for_item(config, item_json, regenerate=updating) continuation = None while continuation != '': items = client_session.get_stream_items_ids(stream_id=folder_id, exclude_target="user/-/state/com.google/read", limit=config.items_per_query, continuation=continuation) item_ids = [item.id for item in items.item_refs] process(item_ids) continuation = items.continuation # Remove items that we didn't get from the server but are in the JSON directory removed_items = 0 for item_path in config.json_root.glob("*.json"): if not item_path in grabbed_item_paths: # remove HTML item_json = json.load(item_path.open("r")) remove_html_for_item(config, item_json, ignore_deleted=True) # ignore if file was deleted by user during sync # remove JSON item_path.unlink() removed_items += 1 print(f"Synchronization successful ({new_items} new items, {updated_items} updated, {removed_items} removed)") config.update_lock.unlink() def generate_html_for_item(config, item_json, regenerate=False): # Write HTML file for a JSON object html_path = config.html_root / item_json["html_path"] if html_path.exists() and not regenerate: print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.") else: with html_path.open("w") as f: f.write(config.item_template.render(item_json)) # set accessed date to update time, modified to publication time os.utime(html_path, (max(item_json["updated"], item_json["updated"]), item_json["published"])) def remove_html_for_item(config, item_json, ignore_deleted=False): # Delete a HTML file for a JSON object html_path = config.html_root / item_json["html_path"] if not ignore_deleted or html_path.exists(): html_path.unlink() def remove_empty_html_directories(config): # Remove empty directories in the HTML directory html_root = config.html_root for (dirpath, dirnames, filenames) in html_root.walk(top_down=False): if dirpath != html_root: if len(dirnames) == 0 and len(filenames) == 0: dirpath.rmdir() def synchronize(config, client_session): # Do a full feather update mark_deleted_as_read(config, client_session) synchronize_with_server(config, client_session) remove_empty_html_directories(config) def synchronize_local_changes(config, client_session): # Upload local changes (read items) to the server mark_deleted_as_read(config, client_session) remove_empty_html_directories(config) def synchronize_remote_changes(config, client_session): # Download remote changes (new items, items read from another device) from the server synchronize_with_server(config, client_session) remove_empty_html_directories(config) async def daemon_sync_up_loop(config, client_session): while True: synchronize_local_changes(config, client_session) await asyncio.sleep(config.daemon_sync_up_every) async def daemon_sync_down_loop(config, client_session): while True: synchronize_remote_changes(config, client_session) await asyncio.sleep(config.daemon_sync_down_every) async def daemon(config, client_session): print(f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s") async with asyncio.TaskGroup() as tg: tup = tg.create_task(daemon_sync_up_loop(config, client_session)) tdown = tg.create_task(daemon_sync_down_loop(config, client_session)) def cancel_tasks(): tup.cancel() tdown.cancel() asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks) def regenerate_files(config): for json_path in config.json_root.glob("*.json"): item_json = json.load(json_path.open("r")) remove_html_for_item(config, item_json, ignore_deleted=True) # path might change so we preemptively remove the old file set_computed_fields_json(config, item_json) # recompute formatted datetime & path from the current configuration # rewrite JSON with json_path.open("w") as f: json.dump(item_json, f) # rewrite HTML generate_html_for_item(config, item_json, regenerate=True) def clear_data(config): for json_path in config.json_root.glob("*.json"): item_json = json.load(json_path.open("r")) remove_html_for_item(config, item_json, ignore_deleted=True) json_path.unlink() remove_empty_html_directories(config) #%% Run feather def main(): parser = argparse.ArgumentParser( prog="feather", description="file-based RSS reader" ) parser.add_argument( "action", choices=("sync", "sync-up", "sync-down", "daemon", "regenerate", "clear-data"), help="sync: perform a full synchronization with the server; sync-up: only synchronize local changes to the server (e.g. items read locally); sync-down: only synchronize remote change from the server (e.g. new items or items read from another device); daemon: start in daemon mode (will keep performing synchronizations periodically until process is stopped); regenerate: regenerate all HTML files from the local data; clear-data: remove all local data" ) args = parser.parse_args() config = Config() if args.action == "sync": client_session = ClientSession(config) synchronize(config, client_session) elif args.action == "sync-up": client_session = ClientSession(config) synchronize_local_changes(config, client_session) elif args.action == "sync-down": client_session = ClientSession(config) synchronize_remote_changes(config, client_session) elif args.action == "daemon": client_session = ClientSession(config) try: asyncio.run(daemon(config, client_session)) except KeyboardInterrupt: pass elif args.action == "regenerate": regenerate_files(config) elif args.action == "clear-data": clear_data(config) if __name__ == "__main__": main()