1
0
Fork 0
mirror of https://codeberg.org/Reuh/feather.git synced 2025-10-27 18:19:32 +00:00
feather/feather.py

508 lines
20 KiB
Python
Executable file

#!/usr/bin/python3
from __future__ import annotations
import os
import re
import json
import tomllib
import sys
import argparse
import asyncio
import signal
from abc import ABC, abstractmethod
from datetime import datetime
from zoneinfo import ZoneInfo
from pathlib import Path
from hashlib import sha256
from jinja2 import Template
from ttrss.client import TTRClient
import google_reader
#%% Configuration
class Config:
def __init__(self):
with open("config.default.toml", "rb") as f:
default_config = tomllib.load(f)
config_path = Path(os.environ.get("CONFIG_PATH") or "config.toml")
if config_path.exists():
with config_path.open("rb") as f:
config = tomllib.load(f)
elif "CONFIG_PATH" in os.environ:
print(f"Configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path")
exit(1)
else:
config = {}
def get_config(category, field, can_default=True):
env_name = f"{category.upper()}_{field.upper()}"
c = config.get(category, {})
if env_name in os.environ:
return os.environ[env_name]
elif field in c:
return c[field]
elif can_default:
return default_config[category][field]
else:
print(f"Error while loading configuration: {category}.{field} not found in {config_path} nor in environment variable {env_name}", file=sys.stderr)
exit(1)
# Get config fields
self.html_root: Path = Path(get_config("directories", "reader"))
self.json_root: Path = Path(get_config("directories", "data"))
self.server_api: str = str(get_config("server", "api"))
self.server_url: str = str(get_config("server", "url", False))
self.server_user: str = str(get_config("server", "user", False))
self.server_password: str = str(get_config("server", "password", False))
self.items_per_query: int = int(get_config("server", "items_per_request"))
self.timezone: ZoneInfo = ZoneInfo(str(get_config("datetime", "timezone")))
self.time_format: str = str(get_config("datetime", "format"))
self.item_template: Template = Template(str(get_config("html", "template")), autoescape=True)
self.item_filename_template: Template = Template(str(get_config("html", "filename_template")), autoescape=False)
self.item_category_template: Template = Template(str(get_config("html", "category_template")), autoescape=False)
self.max_filename_length: int = int(get_config("html", "max_filename_length"))
self.filename_translation = str.maketrans(get_config("html", "filename_replacement"))
self.daemon_sync_up_every: int = int(get_config("daemon", "sync_up_every"))
self.daemon_sync_down_every: int = int(get_config("daemon", "sync_down_every"))
# Computed config fields
self.update_lock = self.json_root / "update.lock"
# Create missing directories
self.html_root.mkdir(exist_ok=True)
self.json_root.mkdir(exist_ok=True)
#%% Interaction with server
type Id = int | str
class Article(ABC):
id: Id
title: str = ""
published: int = 0
updated: int = 0
author: str = ""
summary: str = ""
content: str = ""
feed_title: str = ""
feed_url: str = ""
feed_icon_url: str = ""
feed_order: int = 0
article_url: str = ""
comments_url: str = ""
language: str = ""
image_url: str = ""
def asdict(self):
return {
"id": self.id,
"title": self.title,
"published": self.published,
"updated": self.updated,
"author": self.author,
"summary": self.summary,
"content": self.content,
"feed_title": self.feed_title,
"feed_url": self.feed_url,
"feed_icon_url": self.feed_icon_url,
"feed_order": self.feed_order,
"article_url": self.article_url,
"comments_url": self.comments_url,
"language": self.language,
"image_url": self.image_url,
}
class GReaderArticle(Article):
def __init__(self, session: GReaderSession, item_content):
self.id = item_content.id
self.title = item_content.title
self.published = item_content.published
self.updated = item_content.updated
self.author = item_content.author
self.summary = item_content.summary.content
self.content = item_content.content.content
self.feed_title = item_content.origin.title
self.feed_url = item_content.origin.html_url
self.article_url = item_content.canonical[0].href
class TTRArticle(Article):
def __init__(self, session: TRRSession, article):
self.id = article.id
self.title = article.title
self.published = article.updated.timestamp()
self.updated = article.updated.timestamp()
self.author = article.author
self.summary = article.excerpt
self.content = article.content
self.feed_title = article.feed_title
self.feed_url = article.site_url
self.feed_icon_url = session.feeds[article.feed_id]["icon"]
self.feed_order = session.feeds[article.feed_id]["order"]
self.article_url = article.link
self.comments_url = article.comments_link
self.language = article.lang
self.image_url = article.flavor_image
class Category:
id: Id
title: str
parents: list[Category]
order: int = 0
def __init__(self, id, title, parents=[], order=0):
self.id = id
self.title = title
self.parents = parents
self.order = order
def asdict(self):
return {
"id": self.id,
"title": self.title,
"parents": [ dir.asdict() for dir in self.parents ],
"order": self.order
}
class ClientSession(ABC):
@abstractmethod
def mark_as_read(self, item_ids: list[Id]):
"""
Mark all the given articles as read.
"""
pass
@abstractmethod
def list_folders(self) -> list[Category]:
"""
Returns a list of all the categories on the server.
"""
pass
@abstractmethod
def get_unread_articles_in_folder(self, folder_id: Id, limit: int, continuation: int=0) -> list[Article]:
"""
Returns a list of Articles in the given category. limit and continuation are required for pagination.
"""
pass
label_name = re.compile("user/.*/label/(.*)")
class GReaderSession(ClientSession):
greader: google_reader.Client
auth_token: str
csrf_token: str
def __init__(self, config: Config):
self.greader = google_reader.Client(config.server_url)
self.auth_token = self.greader.login(config.server_user, config.server_password)
self.csrf_token = self.greader.get_token(self.auth_token)
def mark_as_read(self, item_ids: list[Id]):
self.greader.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ])
def list_folders(self):
folders = [tag for tag in self.greader.list_tags(self.auth_token) if tag.type == "folder"]
l = []
for folder in folders:
folder_name = folder.label or label_name.search(folder.id).group(1)
folder_id = folder.id
l.append(Category(id=folder_id, title=folder_name))
return l
def get_unread_articles_in_folder(self, folder_id, limit=500, continuation=0):
items_ids = self.greader.get_stream_items_ids(self.auth_token, stream_id=folder_id, exclude_target="user/-/state/com.google/read", limit=limit, continuation=continuation)
item_contents = self.greader.get_stream_items_contents(self.auth_token, self.csrf_token, item_ids=[item.id for item in items.item_refs])
return [ GReaderArticle(self, item_content) for item_content in item_contents.items ]
class TRRSession(ClientSession):
ttrss: TTRClient
feeds: dict
def __init__(self, config: Config):
self.ttrss = TTRClient(config.server_url, config.server_user, config.server_password, auto_login=True)
self.ttrss.login()
self.feeds = {}
def mark_as_read(self, item_ids):
self.ttrss.mark_read(item_ids)
def list_folders(self):
self.feeds = {}
def get_categories_recursive(parent_category, parent_categories=[]):
categories = []
for i in range(len(parent_category["items"])):
item = parent_category["items"][i]
# skip special categories and feeds
if item["bare_id"] <= 0:
continue
# category
elif item.get("type") == "category":
category = Category(id=item["bare_id"], parents=parent_categories, title=item["name"], order=i)
categories.append(category)
categories += get_categories_recursive(item, parent_categories+[category])
# feeds
elif "type" not in item:
self.feeds[item["bare_id"]] = item
self.feeds[item["bare_id"]]["order"] = i
return categories
tree = self.ttrss.get_feed_tree()
return get_categories_recursive(tree["categories"])
def get_unread_articles_in_folder(self, folder_id, limit=100, continuation=0):
headlines = self.ttrss.get_headlines(feed_id=folder_id, limit=limit, skip=continuation, is_cat=True, show_excerpt=True, show_content=True, view_mode="unread", include_attachments=True, include_nested=False)
return [ TTRArticle(self, headline) for headline in headlines ]
def make_client_session(config: Config):
api = config.server_api
if api == "googlereader":
return GReaderSession(config)
elif api == "ttrss":
return TRRSession(config)
else:
print(f"Configuration error: server.api must be either ttrss or googlereader", file=sys.stderr)
exit(1)
#%% Regular feather operations
def mark_deleted_as_read(config, client_session):
# Mark items that are in the JSON directory but with missing HTML file as read on the server
if config.update_lock.exists():
print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage")
return
marked_as_read = 0
to_mark_as_read = []
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
html_path = config.html_root / item_json["html_path"]
if not html_path.exists():
to_mark_as_read.append(item_json["id"])
# delete JSON file
json_path.unlink()
marked_as_read += 1
for i in range(0, len(to_mark_as_read), config.items_per_query):
client_session.mark_as_read(to_mark_as_read[i:i+500])
print(f"Marked {marked_as_read} items as read")
def escape_filename(config, filename):
return filename.translate(config.filename_translation)
def truncate_filename(config, filename):
max_filename_length = config.max_filename_length
filename_utf8 = filename.encode("utf-8")
if len(filename_utf8) <= max_filename_length:
return filename
else:
suffix = Path(filename).suffix
max_basename_length = max_filename_length - len(suffix.encode("utf-8"))
cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore"))
return filename[:cutoff] + '' + suffix
def get_html_path(config, item_json):
folder_directory = config.html_root
for folder in item_json["folder"]["parents"]:
folder_directory /= escape_filename(config, config.item_category_template.render(folder))
folder_directory /= escape_filename(config, config.item_category_template.render(item_json["folder"]))
folder_directory.mkdir(parents=True, exist_ok=True) # TODO move
html_name = truncate_filename(config, escape_filename(config, config.item_filename_template.render(item_json)))
return folder_directory / html_name
def format_datetime(config, timestamp):
return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format)
def set_computed_fields_json(config, item_json):
item_json["published_formatted"] = format_datetime(config, item_json["published"])
item_json["updated_formatted"] = format_datetime(config, item_json["updated"])
item_json["html_path"] = str(get_html_path(config, item_json).relative_to(config.html_root))
def synchronize_with_server(config, client_session):
# Synchronize items from the server, generating and deleting JSON and HTML files accordingly
config.update_lock.touch()
print("Synchronizing with server...")
new_items, updated_items = 0, 0
grabbed_item_paths = []
folders = client_session.list_folders()
for category in folders:
folder_path, folder_id = category.title, category.id
print(f" Updating folder {folder_path}") # TODO fixme
remaining, continuation = True, 0
while remaining:
articles = client_session.get_unread_articles_in_folder(folder_id, limit=config.items_per_query, continuation=continuation)
if len(articles) >= config.items_per_query:
continuation += len(articles)
else:
remaining = False
for item in articles:
item_json = item.asdict()
item_json["folder"] = category.asdict()
set_computed_fields_json(config, item_json)
json_path = config.json_root / f"{ sha256(str(item_json["id"]).encode("utf-8")).hexdigest() }.json"
grabbed_item_paths.append(json_path)
write_files, updating = False, False
if not json_path.exists():
write_files = True
new_items += 1
else:
old_item_json = json.load(json_path.open("r"))
if item_json["updated"] > old_item_json["updated"]:
write_files, updating = True, True
updated_items += 1
if write_files:
# write JSON
with json_path.open("w") as f:
json.dump(item_json, f)
# write HTML
generate_html_for_item(config, item_json, regenerate=updating)
# Remove items that we didn't get from the server but are in the JSON directory
removed_items = 0
for item_path in config.json_root.glob("*.json"):
if not item_path in grabbed_item_paths:
# remove HTML
item_json = json.load(item_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True) # ignore if file was deleted by user during sync
# remove JSON
item_path.unlink()
removed_items += 1
print(f"Synchronization successful ({new_items} new items, {updated_items} updated, {removed_items} removed)")
config.update_lock.unlink()
def generate_html_for_item(config, item_json, regenerate=False):
# Write HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if html_path.exists() and not regenerate:
print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.")
else:
with html_path.open("w") as f:
f.write(config.item_template.render(item_json))
# set accessed date to update time, modified to publication time
os.utime(html_path, (max(item_json["updated"], item_json["updated"]), item_json["published"]))
def remove_html_for_item(config, item_json, ignore_deleted=False):
# Delete a HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if not ignore_deleted or html_path.exists():
html_path.unlink()
def remove_empty_html_directories(config):
# Remove empty directories in the HTML directory
html_root = config.html_root
removed_directories = set()
for (dirpath, dirnames, filenames) in html_root.walk(top_down=False):
if dirpath != html_root:
is_empty = len(filenames) == 0
if is_empty and len(dirnames) > 0: # some subdirectories may have been removed in an earlier iteration
for subdirname in dirnames:
if dirpath / subdirname not in removed_directories:
is_empty = False
break
if is_empty:
dirpath.rmdir()
removed_directories.add(dirpath)
def synchronize(config, client_session):
# Do a full feather update
mark_deleted_as_read(config, client_session)
synchronize_with_server(config, client_session)
remove_empty_html_directories(config)
def synchronize_local_changes(config, client_session):
# Upload local changes (read items) to the server
mark_deleted_as_read(config, client_session)
remove_empty_html_directories(config)
def synchronize_remote_changes(config, client_session):
# Download remote changes (new items, items read from another device) from the server
synchronize_with_server(config, client_session)
remove_empty_html_directories(config)
async def daemon_sync_up_loop(config, client_session):
while True:
synchronize_local_changes(config, client_session)
await asyncio.sleep(config.daemon_sync_up_every)
async def daemon_sync_down_loop(config, client_session):
while True:
synchronize_remote_changes(config, client_session)
await asyncio.sleep(config.daemon_sync_down_every)
async def daemon(config, client_session):
print(f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s")
async with asyncio.TaskGroup() as tg:
tup = tg.create_task(daemon_sync_up_loop(config, client_session))
tdown = tg.create_task(daemon_sync_down_loop(config, client_session))
def cancel_tasks():
tup.cancel()
tdown.cancel()
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks)
def regenerate_files(config):
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True) # path might change so we preemptively remove the old file
set_computed_fields_json(config, item_json) # recompute formatted datetime & path from the current configuration
# rewrite JSON
with json_path.open("w") as f:
json.dump(item_json, f)
# rewrite HTML
generate_html_for_item(config, item_json, regenerate=True)
def clear_data(config):
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True)
json_path.unlink()
remove_empty_html_directories(config)
#%% Run feather
def main():
parser = argparse.ArgumentParser(
prog="feather",
description="file-based RSS reader client"
)
parser.add_argument(
"action", choices=("sync", "sync-up", "sync-down", "daemon", "regenerate", "clear-data"),
help="sync: perform a full synchronization with the server; sync-up: only synchronize local changes to the server (e.g. items read locally); sync-down: only synchronize remote change from the server (e.g. new items or items read from another device); daemon: start in daemon mode (will keep performing synchronizations periodically until process is stopped); regenerate: regenerate all HTML files from the local data; clear-data: remove all local data"
)
args = parser.parse_args()
config = Config()
if args.action == "sync":
client_session = make_client_session(config)
synchronize(config, client_session)
elif args.action == "sync-up":
client_session = make_client_session(config)
synchronize_local_changes(config, client_session)
elif args.action == "sync-down":
client_session = make_client_session(config)
synchronize_remote_changes(config, client_session)
elif args.action == "daemon":
client_session = make_client_session(config)
try:
asyncio.run(daemon(config, client_session))
except KeyboardInterrupt:
pass
elif args.action == "regenerate":
regenerate_files(config)
elif args.action == "clear-data":
clear_data(config)
if __name__ == "__main__":
main()