#!/usr/bin/python3 from __future__ import annotations import os import re import json import tomllib import sys import argparse import asyncio import signal from abc import ABC, abstractmethod from datetime import datetime from zoneinfo import ZoneInfo from pathlib import Path from hashlib import sha256 from jinja2 import Template from ttrss.client import TTRClient import google_reader #%% Configuration class Config: def __init__(self): with open("config.default.toml", "rb") as f: default_config = tomllib.load(f) config_path = Path(os.environ.get("CONFIG_PATH") or "config.toml") if config_path.exists(): with config_path.open("rb") as f: config = tomllib.load(f) elif "CONFIG_PATH" in os.environ: print(f"Configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path") exit(1) else: config = {} def get_config(category, field, can_default=True): env_name = f"{category.upper()}_{field.upper()}" c = config.get(category, {}) if env_name in os.environ: return os.environ[env_name] elif field in c: return c[field] elif can_default: return default_config[category][field] else: print(f"Error while loading configuration: {category}.{field} not found in {config_path} nor in environment variable {env_name}", file=sys.stderr) exit(1) # Get config fields self.html_root: Path = Path(get_config("directories", "reader")) self.json_root: Path = Path(get_config("directories", "data")) self.server_api: str = str(get_config("server", "api")) self.server_url: str = str(get_config("server", "url", False)) self.server_user: str = str(get_config("server", "user", False)) self.server_password: str = str(get_config("server", "password", False)) self.items_per_query: int = int(get_config("server", "items_per_request")) self.timezone: ZoneInfo = ZoneInfo(str(get_config("datetime", "timezone"))) self.time_format: str = str(get_config("datetime", "format")) self.item_template: Template = Template(str(get_config("html", "template")), autoescape=True) self.item_filename_template: Template = Template(str(get_config("html", "filename_template")), autoescape=False) self.item_category_template: Template = Template(str(get_config("html", "category_template")), autoescape=False) self.max_filename_length: int = int(get_config("html", "max_filename_length")) self.filename_translation = str.maketrans(get_config("html", "filename_replacement")) self.daemon_sync_up_every: int = int(get_config("daemon", "sync_up_every")) self.daemon_sync_down_every: int = int(get_config("daemon", "sync_down_every")) # Computed config fields self.update_lock = self.json_root / "update.lock" # Create missing directories self.html_root.mkdir(exist_ok=True) self.json_root.mkdir(exist_ok=True) #%% Interaction with server type Id = int | str class Article(ABC): id: Id title: str = "" published: int = 0 updated: int = 0 author: str = "" summary: str = "" content: str = "" feed_title: str = "" feed_url: str = "" feed_icon_url: str = "" feed_order: int = 0 article_url: str = "" comments_url: str = "" language: str = "" image_url: str = "" def asdict(self): return { "id": self.id, "title": self.title, "published": self.published, "updated": self.updated, "author": self.author, "summary": self.summary, "content": self.content, "feed_title": self.feed_title, "feed_url": self.feed_url, "feed_icon_url": self.feed_icon_url, "feed_order": self.feed_order, "article_url": self.article_url, "comments_url": self.comments_url, "language": self.language, "image_url": self.image_url, } class GReaderArticle(Article): def __init__(self, session: GReaderSession, item_content): self.id = item_content.id self.title = item_content.title self.published = item_content.published self.updated = item_content.updated self.author = item_content.author self.summary = item_content.summary.content self.content = item_content.content.content self.feed_title = item_content.origin.title self.feed_url = item_content.origin.html_url self.article_url = item_content.canonical[0].href class TTRArticle(Article): def __init__(self, session: TRRSession, article): self.id = article.id self.title = article.title self.published = article.updated.timestamp() self.updated = article.updated.timestamp() self.author = article.author self.summary = article.excerpt self.content = article.content self.feed_title = article.feed_title self.feed_url = article.site_url self.feed_icon_url = session.feeds[article.feed_id]["icon"] self.feed_order = session.feeds[article.feed_id]["order"] self.article_url = article.link self.comments_url = article.comments_link self.language = article.lang self.image_url = article.flavor_image class Category: id: Id title: str parents: list[Category] order: int = 0 def __init__(self, id, title, parents=[], order=0): self.id = id self.title = title self.parents = parents self.order = order def asdict(self): return { "id": self.id, "title": self.title, "parents": [ dir.asdict() for dir in self.parents ], "order": self.order } class ClientSession(ABC): @abstractmethod def mark_as_read(self, item_ids: list[Id]): """ Mark all the given articles as read. """ pass @abstractmethod def list_categories(self) -> list[Category]: """ Returns a list of all the categories on the server. """ pass @abstractmethod def get_unread_articles_in_category(self, category_id: Id, limit: int, continuation: int=0) -> list[Article]: """ Returns a list of Articles in the given category. limit and continuation are required for pagination. """ pass label_name = re.compile("user/.*/label/(.*)") class GReaderSession(ClientSession): greader: google_reader.Client auth_token: str csrf_token: str def __init__(self, config: Config): self.greader = google_reader.Client(config.server_url) self.auth_token = self.greader.login(config.server_user, config.server_password) self.csrf_token = self.greader.get_token(self.auth_token) def mark_as_read(self, item_ids: list[Id]): self.greader.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ]) def list_categories(self): categories = [tag for tag in self.greader.list_tags(self.auth_token) if tag.type == "folder"] l = [] for category in categories: category_name = category.label or label_name.search(category.id).group(1) category_id = category.id l.append(Category(id=category_id, title=category_name)) return l def get_unread_articles_in_category(self, category_id, limit=500, continuation=0): items_ids = self.greader.get_stream_items_ids(self.auth_token, stream_id=category_id, exclude_target="user/-/state/com.google/read", limit=limit, continuation=continuation) item_contents = self.greader.get_stream_items_contents(self.auth_token, self.csrf_token, item_ids=[item.id for item in items.item_refs]) return [ GReaderArticle(self, item_content) for item_content in item_contents.items ] class TRRSession(ClientSession): ttrss: TTRClient feeds: dict def __init__(self, config: Config): self.ttrss = TTRClient(config.server_url, config.server_user, config.server_password, auto_login=True) self.ttrss.login() self.feeds = {} def mark_as_read(self, item_ids): self.ttrss.mark_read(item_ids) def list_categories(self): self.feeds = {} def get_categories_recursive(parent_category, parent_categories=[]): categories = [] index = 0 for item in parent_category["items"]: # skip special categories and feeds if item["bare_id"] <= 0: continue # category elif item.get("type") == "category": category = Category(id=item["bare_id"], parents=parent_categories, title=item["name"], order=index) categories.append(category) categories += get_categories_recursive(item, parent_categories+[category]) # feeds elif "type" not in item: self.feeds[item["bare_id"]] = item self.feeds[item["bare_id"]]["order"] = index index += 1 return categories tree = self.ttrss.get_feed_tree() return get_categories_recursive(tree["categories"]) def get_unread_articles_in_category(self, category_id, limit=100, continuation=0): headlines = self.ttrss.get_headlines(feed_id=category_id, limit=limit, skip=continuation, is_cat=True, show_excerpt=True, show_content=True, view_mode="unread", include_attachments=True, include_nested=False) return [ TTRArticle(self, headline) for headline in headlines ] def make_client_session(config: Config) -> ClientSession: api = config.server_api if api == "googlereader": return GReaderSession(config) elif api == "ttrss": return TRRSession(config) else: print(f"Configuration error: server.api must be either ttrss or googlereader", file=sys.stderr) exit(1) #%% Regular feather operations def mark_deleted_as_read(config, client_session): # Mark items that are in the JSON directory but with missing HTML file as read on the server if config.update_lock.exists(): print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage") return marked_as_read = 0 to_mark_as_read = [] for json_path in config.json_root.glob("*.json"): item_json = json.load(json_path.open("r")) html_path = config.html_root / item_json["html_path"] if not html_path.exists(): to_mark_as_read.append(item_json["id"]) # delete JSON file json_path.unlink() marked_as_read += 1 for i in range(0, len(to_mark_as_read), config.items_per_query): client_session.mark_as_read(to_mark_as_read[i:i+500]) print(f"Marked {marked_as_read} items as read") def escape_filename(config, filename): return filename.translate(config.filename_translation) def truncate_filename(config, filename): max_filename_length = config.max_filename_length filename_utf8 = filename.encode("utf-8") if len(filename_utf8) <= max_filename_length: return filename else: suffix = Path(filename).suffix max_basename_length = max_filename_length - len(suffix.encode("utf-8")) cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore")) return filename[:cutoff] + '…' + suffix def get_html_path(config, item_json): category_directory = config.html_root for category in item_json["category"]["parents"]: category_directory /= escape_filename(config, config.item_category_template.render(category)) category_directory /= escape_filename(config, config.item_category_template.render(item_json["category"])) category_directory.mkdir(parents=True, exist_ok=True) # TODO move html_name = truncate_filename(config, escape_filename(config, config.item_filename_template.render(item_json))) return category_directory / html_name def format_datetime(config, timestamp): return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format) def set_computed_fields_json(config, item_json): # TODO: clean item_json["published_formatted"] = format_datetime(config, item_json["published"]) item_json["updated_formatted"] = format_datetime(config, item_json["updated"]) item_json["html_path"] = str(get_html_path(config, item_json).relative_to(config.html_root)) def synchronize_with_server(config, client_session): # Synchronize items from the server, generating and deleting JSON and HTML files accordingly config.update_lock.touch() print("Synchronizing with server...") new_items, updated_items = 0, 0 grabbed_item_paths = [] categories = client_session.list_categories() for category in categories: print(f" Updating category {category.title}") remaining, continuation = True, 0 while remaining: articles = client_session.get_unread_articles_in_category(category.id, limit=config.items_per_query, continuation=continuation) if len(articles) >= config.items_per_query: continuation += len(articles) else: remaining = False for item in articles: item_json = item.asdict() item_json["category"] = category.asdict() set_computed_fields_json(config, item_json) json_path = config.json_root / f"{ sha256(str(item_json["id"]).encode("utf-8")).hexdigest() }.json" grabbed_item_paths.append(json_path) write_files, updating = False, False if not json_path.exists(): write_files = True new_items += 1 else: old_item_json = json.load(json_path.open("r")) if item_json["updated"] > old_item_json["updated"]: write_files, updating = True, True updated_items += 1 if write_files: # write JSON with json_path.open("w") as f: json.dump(item_json, f) # write HTML generate_html_for_item(config, item_json, regenerate=updating) # Remove items that we didn't get from the server but are in the JSON directory removed_items = 0 for item_path in config.json_root.glob("*.json"): if not item_path in grabbed_item_paths: # remove HTML item_json = json.load(item_path.open("r")) remove_html_for_item(config, item_json, ignore_deleted=True) # ignore if file was deleted by user during sync # remove JSON item_path.unlink() removed_items += 1 print(f"Synchronization successful ({new_items} new items, {updated_items} updated, {removed_items} removed)") config.update_lock.unlink() def generate_html_for_item(config, item_json, regenerate=False): # Write HTML file for a JSON object html_path = config.html_root / item_json["html_path"] if html_path.exists() and not regenerate: print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.") else: with html_path.open("w") as f: f.write(config.item_template.render(item_json)) # set accessed date to update time, modified to publication time os.utime(html_path, (max(item_json["updated"], item_json["updated"]), item_json["published"])) def remove_html_for_item(config, item_json, ignore_deleted=False): # Delete a HTML file for a JSON object html_path = config.html_root / item_json["html_path"] if not ignore_deleted or html_path.exists(): html_path.unlink() def remove_empty_html_directories(config): # Remove empty directories in the HTML directory html_root = config.html_root removed_directories = set() for (dirpath, dirnames, filenames) in html_root.walk(top_down=False): if dirpath != html_root: is_empty = len(filenames) == 0 if is_empty and len(dirnames) > 0: # some subdirectories may have been removed in an earlier iteration for subdirname in dirnames: if dirpath / subdirname not in removed_directories: is_empty = False break if is_empty: dirpath.rmdir() removed_directories.add(dirpath) def synchronize(config, client_session): # Do a full feather update mark_deleted_as_read(config, client_session) synchronize_with_server(config, client_session) remove_empty_html_directories(config) def synchronize_local_changes(config, client_session): # Upload local changes (read items) to the server mark_deleted_as_read(config, client_session) remove_empty_html_directories(config) def synchronize_remote_changes(config, client_session): # Download remote changes (new items, items read from another device) from the server synchronize_with_server(config, client_session) remove_empty_html_directories(config) async def daemon_sync_up_loop(config, client_session): while True: synchronize_local_changes(config, client_session) await asyncio.sleep(config.daemon_sync_up_every) async def daemon_sync_down_loop(config, client_session): while True: synchronize_remote_changes(config, client_session) await asyncio.sleep(config.daemon_sync_down_every) async def daemon(config, client_session): print(f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s") async with asyncio.TaskGroup() as tg: tup = tg.create_task(daemon_sync_up_loop(config, client_session)) tdown = tg.create_task(daemon_sync_down_loop(config, client_session)) def cancel_tasks(): tup.cancel() tdown.cancel() asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks) def regenerate_files(config): for json_path in config.json_root.glob("*.json"): item_json = json.load(json_path.open("r")) remove_html_for_item(config, item_json, ignore_deleted=True) # path might change so we preemptively remove the old file set_computed_fields_json(config, item_json) # recompute formatted datetime & path from the current configuration # rewrite JSON with json_path.open("w") as f: json.dump(item_json, f) # rewrite HTML generate_html_for_item(config, item_json, regenerate=True) def clear_data(config): for json_path in config.json_root.glob("*.json"): item_json = json.load(json_path.open("r")) remove_html_for_item(config, item_json, ignore_deleted=True) json_path.unlink() remove_empty_html_directories(config) #%% Run feather def main(): parser = argparse.ArgumentParser( prog="feather", description="file-based RSS reader client" ) parser.add_argument( "action", choices=("sync", "sync-up", "sync-down", "daemon", "regenerate", "clear-data"), help="sync: perform a full synchronization with the server; sync-up: only synchronize local changes to the server (e.g. items read locally); sync-down: only synchronize remote change from the server (e.g. new items or items read from another device); daemon: start in daemon mode (will keep performing synchronizations periodically until process is stopped); regenerate: regenerate all HTML files from the local data; clear-data: remove all local data" ) args = parser.parse_args() config = Config() if args.action == "sync": client_session = make_client_session(config) synchronize(config, client_session) elif args.action == "sync-up": client_session = make_client_session(config) synchronize_local_changes(config, client_session) elif args.action == "sync-down": client_session = make_client_session(config) synchronize_remote_changes(config, client_session) elif args.action == "daemon": client_session = make_client_session(config) try: asyncio.run(daemon(config, client_session)) except KeyboardInterrupt: pass elif args.action == "regenerate": regenerate_files(config) elif args.action == "clear-data": clear_data(config) if __name__ == "__main__": main()