1
0
Fork 0
mirror of https://codeberg.org/Reuh/feather.git synced 2025-10-27 18:19:32 +00:00
feather/main.py

343 lines
15 KiB
Python
Executable file

#!/usr/bin/python3
import os
import re
import json
import google_reader
import tomllib
import sys
import argparse
import asyncio
import signal
from datetime import datetime
from zoneinfo import ZoneInfo
from pathlib import Path
from hashlib import sha256
from jinja2 import Template
#%% Configuration
class Config:
def __init__(self):
with open("config.default.toml", "rb") as f:
default_config = tomllib.load(f)
config_path = Path(os.environ.get("CONFIG_PATH") or "config.toml")
if config_path.exists():
with config_path.open("rb") as f:
config = tomllib.load(f)
elif "CONFIG_PATH" in os.environ:
print(f"Configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path")
exit(1)
else:
config = {}
def get_config(category, field, can_default=True):
env_name = f"{category.upper()}_{field.upper()}"
c = config.get(category, {})
if env_name in os.environ:
return os.environ[env_name]
elif field in c:
return c[field]
elif can_default:
return default_config[category][field]
else:
print(f"Error while loading configuration: {category}.{field} not found in {config_path} nor in environment variable {env_name}", file=sys.stderr)
exit(1)
# Get config fields
self.html_root: Path = Path(get_config("directories", "reader"))
self.json_root: Path = Path(get_config("directories", "data"))
self.server_url: str = str(get_config("server", "url", False))
self.server_user: str = str(get_config("server", "user", False))
self.server_password: str = str(get_config("server", "password", False))
self.items_per_query: int = int(get_config("server", "items_per_request"))
self.timezone: ZoneInfo = ZoneInfo(str(get_config("datetime", "timezone")))
self.time_format: str = str(get_config("datetime", "format"))
self.item_template: Template = Template(str(get_config("html", "template")), autoescape=True)
self.item_filename_template: Template = Template(str(get_config("html", "filename_template")), autoescape=False)
self.max_filename_length: int = int(get_config("html", "max_filename_length"))
self.filename_translation = str.maketrans(get_config("html", "filename_replacement"))
self.daemon_sync_up_every: int = int(get_config("daemon", "sync_up_every"))
self.daemon_sync_down_every: int = int(get_config("daemon", "sync_down_every"))
# Computed config fields
self.update_lock = self.json_root / "update.lock"
# Create missing directories
self.html_root.mkdir(exist_ok=True)
self.json_root.mkdir(exist_ok=True)
#%% Interaction with server
label_name = re.compile("user/.*/label/(.*)")
class ClientSession:
client: google_reader.Client
auth_token: str
csrf_token: str
def __init__(self, config: Config):
self.client = google_reader.Client(config.server_url)
self.auth_token = self.client.login(config.server_user, config.server_password)
self.csrf_token = self.client.get_token(self.auth_token)
def mark_as_read(self, item_ids):
self.client.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ])
def list_folders(self):
folders = [tag for tag in self.client.list_tags(self.auth_token) if tag.type == "folder"]
l = []
for folder in folders:
folder_name = folder.label or label_name.search(folder.id).group(1)
folder_id = folder.id
l.append((folder_name, folder_id))
return l
def get_stream_items_ids(self, *args, **kwargs):
return self.client.get_stream_items_ids(self.auth_token, *args, **kwargs)
def get_stream_items_contents(self, *args, **kwargs):
return self.client.get_stream_items_contents(self.auth_token, self.csrf_token, *args, **kwargs)
#%% Regular feather operations
def mark_deleted_as_read(config, client_session):
# Mark items that are in the JSON directory but with missing HTML file as read on the server
if config.update_lock.exists():
print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage")
return
marked_as_read = 0
to_mark_as_read = []
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
html_path = config.html_root / item_json["html_path"]
if not html_path.exists():
to_mark_as_read.append(item_json["id"])
# delete JSON file
json_path.unlink()
marked_as_read += 1
for i in range(0, len(to_mark_as_read), config.items_per_query):
client_session.mark_as_read(to_mark_as_read[i:i+500])
print(f"Marked {marked_as_read} items as read")
def escape_filename(config, filename):
return filename.translate(config.filename_translation)
def truncate_filename(config, filename):
max_filename_length = config.max_filename_length
filename_utf8 = filename.encode("utf-8")
if len(filename_utf8) <= max_filename_length:
return filename
else:
suffix = Path(filename).suffix
max_basename_length = max_filename_length - len(suffix.encode("utf-8"))
cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore"))
return filename[:cutoff] + '' + suffix
def get_html_path(config, item_json):
folder_directory = config.html_root / escape_filename(config, item_json["folder"])
folder_directory.mkdir(exist_ok=True)
html_name = truncate_filename(config, escape_filename(config, config.item_filename_template.render(item_json)))
return folder_directory / html_name
def format_datetime(config, timestamp):
return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format)
def set_computed_fields_json(config, item_json):
item_json["published_formatted"] = format_datetime(config, item_json["published"])
item_json["updated_formatted"] = format_datetime(config, item_json["updated"])
item_json["html_path"] = str(get_html_path(config, item_json).relative_to(config.html_root))
def synchronize_with_server(config, client_session):
# Synchronize items from the server, generating and deleting JSON and HTML files accordingly
config.update_lock.touch()
print("Synchronizing with server...")
new_items, updated_items = 0, 0
grabbed_item_paths = []
folders = client_session.list_folders()
for (folder_name, folder_id) in folders:
print(f" Updating folder {folder_name}")
def process(item_ids):
nonlocal new_items, updated_items, grabbed_item_paths
if len(item_ids) > 0:
item_contents = client_session.get_stream_items_contents(item_ids=item_ids)
for item_content in item_contents.items:
item_json = {
"id": item_content.id,
"folder": folder_name,
"title": item_content.title,
"published": item_content.published,
"updated": item_content.updated,
"author": item_content.author,
"summary": item_content.summary.content,
"content": item_content.content.content,
"origin_title": item_content.origin.title,
"origin_url": item_content.origin.html_url,
"canonical_url": item_content.canonical[0].href,
}
set_computed_fields_json(config, item_json)
json_path = config.json_root / f"{ sha256(item_json["id"].encode("utf-8")).hexdigest() }.json"
grabbed_item_paths.append(json_path)
write_files, updating = False, False
if not json_path.exists():
write_files = True
new_items += 1
else:
old_item_json = json.load(json_path.open("r"))
if item_json["updated"] > old_item_json["updated"]:
write_files, updating = True, True
updated_items += 1
if write_files:
# write JSON
with json_path.open("w") as f:
json.dump(item_json, f)
# write HTML
generate_html_for_item(config, item_json, regenerate=updating)
continuation = None
while continuation != '':
items = client_session.get_stream_items_ids(stream_id=folder_id, exclude_target="user/-/state/com.google/read", limit=config.items_per_query, continuation=continuation)
item_ids = [item.id for item in items.item_refs]
process(item_ids)
continuation = items.continuation
# Remove items that we didn't get from the server but are in the JSON directory
removed_items = 0
for item_path in config.json_root.glob("*.json"):
if not item_path in grabbed_item_paths:
# remove HTML
item_json = json.load(item_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True) # ignore if file was deleted by user during sync
# remove JSON
item_path.unlink()
removed_items += 1
print(f"Synchronization successful ({new_items} new items, {updated_items} updated, {removed_items} removed)")
config.update_lock.unlink()
def generate_html_for_item(config, item_json, regenerate=False):
# Write HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if html_path.exists() and not regenerate:
print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.")
else:
with html_path.open("w") as f:
f.write(config.item_template.render(item_json))
# set accessed date to update time, modified to publication time
os.utime(html_path, (max(item_json["updated"], item_json["updated"]), item_json["published"]))
def remove_html_for_item(config, item_json, ignore_deleted=False):
# Delete a HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if not ignore_deleted or html_path.exists():
html_path.unlink()
def remove_empty_html_directories(config):
# Remove empty directories in the HTML directory
html_root = config.html_root
for (dirpath, dirnames, filenames) in html_root.walk(top_down=False):
if dirpath != html_root:
if len(dirnames) == 0 and len(filenames) == 0:
dirpath.rmdir()
def synchronize(config, client_session):
# Do a full feather update
mark_deleted_as_read(config, client_session)
synchronize_with_server(config, client_session)
remove_empty_html_directories(config)
def synchronize_local_changes(config, client_session):
# Upload local changes (read items) to the server
mark_deleted_as_read(config, client_session)
remove_empty_html_directories(config)
def synchronize_remote_changes(config, client_session):
# Download remote changes (new items, items read from another device) from the server
synchronize_with_server(config, client_session)
remove_empty_html_directories(config)
async def daemon_sync_up_loop(config, client_session):
while True:
synchronize_local_changes(config, client_session)
await asyncio.sleep(config.daemon_sync_up_every)
async def daemon_sync_down_loop(config, client_session):
while True:
synchronize_remote_changes(config, client_session)
await asyncio.sleep(config.daemon_sync_down_every)
async def daemon(config, client_session):
print(f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s")
async with asyncio.TaskGroup() as tg:
tup = tg.create_task(daemon_sync_up_loop(config, client_session))
tdown = tg.create_task(daemon_sync_down_loop(config, client_session))
def cancel_tasks():
tup.cancel()
tdown.cancel()
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks)
def regenerate_files(config):
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True) # path might change so we preemptively remove the old file
set_computed_fields_json(config, item_json) # recompute formatted datetime & path from the current configuration
# rewrite JSON
with json_path.open("w") as f:
json.dump(item_json, f)
# rewrite HTML
generate_html_for_item(config, item_json, regenerate=True)
def clear_data(config):
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True)
json_path.unlink()
remove_empty_html_directories(config)
#%% Run feather
def main():
parser = argparse.ArgumentParser(
prog="feather",
description="file-based RSS reader"
)
parser.add_argument(
"action", choices=("sync", "sync-up", "sync-down", "daemon", "regenerate", "clear-data"),
help="sync: perform a full synchronization with the server; sync-up: only synchronize local changes to the server (e.g. items read locally); sync-down: only synchronize remote change from the server (e.g. new items or items read from another device); daemon: start in daemon mode (will keep performing synchronizations periodically until process is stopped); regenerate: regenerate all HTML files from the local data; clear-data: remove all local data"
)
args = parser.parse_args()
config = Config()
if args.action == "sync":
client_session = ClientSession(config)
synchronize(config, client_session)
elif args.action == "sync-up":
client_session = ClientSession(config)
synchronize_local_changes(config, client_session)
elif args.action == "sync-down":
client_session = ClientSession(config)
synchronize_remote_changes(config, client_session)
elif args.action == "daemon":
client_session = ClientSession(config)
try:
asyncio.run(daemon(config, client_session))
except KeyboardInterrupt:
pass
elif args.action == "regenerate":
regenerate_files(config)
elif args.action == "clear-data":
clear_data(config)
if __name__ == "__main__":
main()