1
0
Fork 0
mirror of https://codeberg.org/Reuh/feather.git synced 2025-10-27 10:09:32 +00:00

refactor: everything into several files and a valid python package

This commit is contained in:
Étienne Fildadut 2025-10-10 23:55:56 +02:00
parent 58e8a14b93
commit b0e0c5d0df
15 changed files with 646 additions and 1227 deletions

20
.dockerignore Normal file
View file

@ -0,0 +1,20 @@
# Git
.git
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# Feather config file
config.toml
# Feather runtime files
reader/
data/

4
.gitignore vendored
View file

@ -9,9 +9,9 @@ wheels/
# Virtual environments
.venv
# Config file
# Feather config file
config.toml
# Runtime files
# Feather runtime files
reader/
data/

View file

@ -1,10 +1,9 @@
FROM docker.io/alpine:3.22
FROM ghcr.io/astral-sh/uv:alpine3.22
RUN apk add --no-cache python3 py3-requests py3-jinja2 py3-tzdata
ENV PYTHONUNBUFFERED=1
RUN mkdir /feather
COPY . /feather
WORKDIR /feather
RUN uv sync --locked
COPY *.py config.default.toml LICENSE /feather/
ENTRYPOINT [ "python3", "-u", "feather.py" ]
ENTRYPOINT [ "uv", "run", "feather" ]

View file

@ -40,11 +40,13 @@ After changing the configuration, you can call `feather regenerate` to regenerat
### Docker
`podman run -d -v ./config.toml:/feather/config.toml -v feather-data:/feather/data -v ./reader:/feather/reader --name feather feather daemon`
`docker run -d -v ./config.toml:/feather/config.toml -v feather-data:/feather/data -v ./reader:/feather/reader --name feather feather daemon`
### Raw
You need Python 3.12 or newer. Then pip it up.
You need Python 3.12 or newer. Then pip it up, as the kids say.
`uv run feather`
## FAQ
@ -57,18 +59,9 @@ You need Python 3.12 or newer. Then pip it up.
## TODO
- [ ] Write documentation
- [x] Perform mark-as-read operation more often than sync (inotify, daemon, etc.)
- [ ] inotify might still be nice for instant reactions
- [x] Make HTML filename configurable
- [x] Make HTML template configurable
- [ ] Nested categories: ttrss-python?
- [ ] Use inotify for real-time article mark-as-read action
- [ ] Share the fun somewhere
- [x] Edge cases: mark as read during sync (if marked as read on server or not)
- [x] Proper filename escaping
- [x] Command to force regenerate all HTML files (incl. recompute datetimes & paths)
- [x] Handle item updates
- [ ] Actually think about the issues created by the duplicate warning
- [x] Set generated files creation/modification date instead of putting date in filename
- [ ] Make a proper Python package
- [ ] Attachments
- [ ] Test with FreshRSS

View file

@ -1,508 +0,0 @@
#!/usr/bin/python3
from __future__ import annotations
import os
import re
import json
import tomllib
import sys
import argparse
import asyncio
import signal
from abc import ABC, abstractmethod
from datetime import datetime
from zoneinfo import ZoneInfo
from pathlib import Path
from hashlib import sha256
from jinja2 import Template
from ttrss.client import TTRClient
import google_reader
#%% Configuration
class Config:
def __init__(self):
with open("config.default.toml", "rb") as f:
default_config = tomllib.load(f)
config_path = Path(os.environ.get("CONFIG_PATH") or "config.toml")
if config_path.exists():
with config_path.open("rb") as f:
config = tomllib.load(f)
elif "CONFIG_PATH" in os.environ:
print(f"Configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path")
exit(1)
else:
config = {}
def get_config(category, field, can_default=True):
env_name = f"{category.upper()}_{field.upper()}"
c = config.get(category, {})
if env_name in os.environ:
return os.environ[env_name]
elif field in c:
return c[field]
elif can_default:
return default_config[category][field]
else:
print(f"Error while loading configuration: {category}.{field} not found in {config_path} nor in environment variable {env_name}", file=sys.stderr)
exit(1)
# Get config fields
self.html_root: Path = Path(get_config("directories", "reader"))
self.json_root: Path = Path(get_config("directories", "data"))
self.server_api: str = str(get_config("server", "api"))
self.server_url: str = str(get_config("server", "url", False))
self.server_user: str = str(get_config("server", "user", False))
self.server_password: str = str(get_config("server", "password", False))
self.items_per_query: int = int(get_config("server", "items_per_request"))
self.timezone: ZoneInfo = ZoneInfo(str(get_config("datetime", "timezone")))
self.time_format: str = str(get_config("datetime", "format"))
self.item_template: Template = Template(str(get_config("html", "template")), autoescape=True)
self.item_filename_template: Template = Template(str(get_config("html", "filename_template")), autoescape=False)
self.item_category_template: Template = Template(str(get_config("html", "category_template")), autoescape=False)
self.max_filename_length: int = int(get_config("html", "max_filename_length"))
self.filename_translation = str.maketrans(get_config("html", "filename_replacement"))
self.daemon_sync_up_every: int = int(get_config("daemon", "sync_up_every"))
self.daemon_sync_down_every: int = int(get_config("daemon", "sync_down_every"))
# Computed config fields
self.update_lock = self.json_root / "update.lock"
# Create missing directories
self.html_root.mkdir(exist_ok=True)
self.json_root.mkdir(exist_ok=True)
#%% Interaction with server
type Id = int | str
class Article(ABC):
id: Id
title: str = ""
published: int = 0
updated: int = 0
author: str = ""
summary: str = ""
content: str = ""
feed_title: str = ""
feed_url: str = ""
feed_icon_url: str = ""
feed_order: int = 0
article_url: str = ""
comments_url: str = ""
language: str = ""
image_url: str = ""
def asdict(self):
return {
"id": self.id,
"title": self.title,
"published": self.published,
"updated": self.updated,
"author": self.author,
"summary": self.summary,
"content": self.content,
"feed_title": self.feed_title,
"feed_url": self.feed_url,
"feed_icon_url": self.feed_icon_url,
"feed_order": self.feed_order,
"article_url": self.article_url,
"comments_url": self.comments_url,
"language": self.language,
"image_url": self.image_url,
}
class GReaderArticle(Article):
def __init__(self, session: GReaderSession, item_content):
self.id = item_content.id
self.title = item_content.title
self.published = item_content.published
self.updated = item_content.updated
self.author = item_content.author
self.summary = item_content.summary.content
self.content = item_content.content.content
self.feed_title = item_content.origin.title
self.feed_url = item_content.origin.html_url
self.article_url = item_content.canonical[0].href
class TTRArticle(Article):
def __init__(self, session: TRRSession, article):
self.id = article.id
self.title = article.title
self.published = article.updated.timestamp()
self.updated = article.updated.timestamp()
self.author = article.author
self.summary = article.excerpt
self.content = article.content
self.feed_title = article.feed_title
self.feed_url = article.site_url
self.feed_icon_url = session.feeds[article.feed_id]["icon"]
self.feed_order = session.feeds[article.feed_id]["order"]
self.article_url = article.link
self.comments_url = article.comments_link
self.language = article.lang
self.image_url = article.flavor_image
class Category:
id: Id
title: str
parents: list[Category]
order: int = 0
def __init__(self, id, title, parents=[], order=0):
self.id = id
self.title = title
self.parents = parents
self.order = order
def asdict(self):
return {
"id": self.id,
"title": self.title,
"parents": [ dir.asdict() for dir in self.parents ],
"order": self.order
}
class ClientSession(ABC):
@abstractmethod
def mark_as_read(self, item_ids: list[Id]):
"""
Mark all the given articles as read.
"""
pass
@abstractmethod
def list_categories(self) -> list[Category]:
"""
Returns a list of all the categories on the server.
"""
pass
@abstractmethod
def get_unread_articles_in_category(self, category_id: Id, limit: int, continuation: int=0) -> list[Article]:
"""
Returns a list of Articles in the given category. limit and continuation are required for pagination.
"""
pass
label_name = re.compile("user/.*/label/(.*)")
class GReaderSession(ClientSession):
greader: google_reader.Client
auth_token: str
csrf_token: str
def __init__(self, config: Config):
self.greader = google_reader.Client(config.server_url)
self.auth_token = self.greader.login(config.server_user, config.server_password)
self.csrf_token = self.greader.get_token(self.auth_token)
def mark_as_read(self, item_ids: list[Id]):
self.greader.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ])
def list_categories(self):
categories = [tag for tag in self.greader.list_tags(self.auth_token) if tag.type == "folder"]
l = []
for category in categories:
category_name = category.label or label_name.search(category.id).group(1)
category_id = category.id
l.append(Category(id=category_id, title=category_name))
return l
def get_unread_articles_in_category(self, category_id, limit=500, continuation=0):
items_ids = self.greader.get_stream_items_ids(self.auth_token, stream_id=category_id, exclude_target="user/-/state/com.google/read", limit=limit, continuation=continuation)
item_contents = self.greader.get_stream_items_contents(self.auth_token, self.csrf_token, item_ids=[item.id for item in items.item_refs])
return [ GReaderArticle(self, item_content) for item_content in item_contents.items ]
class TRRSession(ClientSession):
ttrss: TTRClient
feeds: dict
def __init__(self, config: Config):
self.ttrss = TTRClient(config.server_url, config.server_user, config.server_password, auto_login=True)
self.ttrss.login()
self.feeds = {}
def mark_as_read(self, item_ids):
self.ttrss.mark_read(item_ids)
def list_categories(self):
self.feeds = {}
def get_categories_recursive(parent_category, parent_categories=[]):
categories = []
index = 0
for item in parent_category["items"]:
# skip special categories and feeds
if item["bare_id"] <= 0:
continue
# category
elif item.get("type") == "category":
category = Category(id=item["bare_id"], parents=parent_categories, title=item["name"], order=index)
categories.append(category)
categories += get_categories_recursive(item, parent_categories+[category])
# feeds
elif "type" not in item:
self.feeds[item["bare_id"]] = item
self.feeds[item["bare_id"]]["order"] = index
index += 1
return categories
tree = self.ttrss.get_feed_tree()
return get_categories_recursive(tree["categories"])
def get_unread_articles_in_category(self, category_id, limit=100, continuation=0):
headlines = self.ttrss.get_headlines(feed_id=category_id, limit=limit, skip=continuation, is_cat=True, show_excerpt=True, show_content=True, view_mode="unread", include_attachments=True, include_nested=False)
return [ TTRArticle(self, headline) for headline in headlines ]
def make_client_session(config: Config) -> ClientSession:
api = config.server_api
if api == "googlereader":
return GReaderSession(config)
elif api == "ttrss":
return TRRSession(config)
else:
print(f"Configuration error: server.api must be either ttrss or googlereader", file=sys.stderr)
exit(1)
#%% Regular feather operations
def mark_deleted_as_read(config, client_session):
# Mark items that are in the JSON directory but with missing HTML file as read on the server
if config.update_lock.exists():
print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage")
return
marked_as_read = 0
to_mark_as_read = []
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
html_path = config.html_root / item_json["html_path"]
if not html_path.exists():
to_mark_as_read.append(item_json["id"])
# delete JSON file
json_path.unlink()
marked_as_read += 1
for i in range(0, len(to_mark_as_read), config.items_per_query):
client_session.mark_as_read(to_mark_as_read[i:i+500])
print(f"Marked {marked_as_read} items as read")
def escape_filename(config, filename):
return filename.translate(config.filename_translation)
def truncate_filename(config, filename):
max_filename_length = config.max_filename_length
filename_utf8 = filename.encode("utf-8")
if len(filename_utf8) <= max_filename_length:
return filename
else:
suffix = Path(filename).suffix
max_basename_length = max_filename_length - len(suffix.encode("utf-8"))
cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore"))
return filename[:cutoff] + '' + suffix
def get_html_path(config, item_json):
category_directory = config.html_root
for category in item_json["category"]["parents"]:
category_directory /= escape_filename(config, config.item_category_template.render(category))
category_directory /= escape_filename(config, config.item_category_template.render(item_json["category"]))
category_directory.mkdir(parents=True, exist_ok=True) # TODO move
html_name = truncate_filename(config, escape_filename(config, config.item_filename_template.render(item_json)))
return category_directory / html_name
def format_datetime(config, timestamp):
return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format)
def set_computed_fields_json(config, item_json): # TODO: clean
item_json["published_formatted"] = format_datetime(config, item_json["published"])
item_json["updated_formatted"] = format_datetime(config, item_json["updated"])
item_json["html_path"] = str(get_html_path(config, item_json).relative_to(config.html_root))
def synchronize_with_server(config, client_session):
# Synchronize items from the server, generating and deleting JSON and HTML files accordingly
config.update_lock.touch()
print("Synchronizing with server...")
new_items, updated_items = 0, 0
grabbed_item_paths = []
categories = client_session.list_categories()
for category in categories:
print(f" Updating category {category.title}")
remaining, continuation = True, 0
while remaining:
articles = client_session.get_unread_articles_in_category(category.id, limit=config.items_per_query, continuation=continuation)
if len(articles) >= config.items_per_query:
continuation += len(articles)
else:
remaining = False
for item in articles:
item_json = item.asdict()
item_json["category"] = category.asdict()
set_computed_fields_json(config, item_json)
json_path = config.json_root / f"{ sha256(str(item_json["id"]).encode("utf-8")).hexdigest() }.json"
grabbed_item_paths.append(json_path)
write_files, updating = False, False
if not json_path.exists():
write_files = True
new_items += 1
else:
old_item_json = json.load(json_path.open("r"))
if item_json["updated"] > old_item_json["updated"]:
write_files, updating = True, True
updated_items += 1
if write_files:
# write JSON
with json_path.open("w") as f:
json.dump(item_json, f)
# write HTML
generate_html_for_item(config, item_json, regenerate=updating)
# Remove items that we didn't get from the server but are in the JSON directory
removed_items = 0
for item_path in config.json_root.glob("*.json"):
if not item_path in grabbed_item_paths:
# remove HTML
item_json = json.load(item_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True) # ignore if file was deleted by user during sync
# remove JSON
item_path.unlink()
removed_items += 1
print(f"Synchronization successful ({new_items} new items, {updated_items} updated, {removed_items} removed)")
config.update_lock.unlink()
def generate_html_for_item(config, item_json, regenerate=False):
# Write HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if html_path.exists() and not regenerate:
print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.")
else:
with html_path.open("w") as f:
f.write(config.item_template.render(item_json))
# set accessed date to update time, modified to publication time
os.utime(html_path, (max(item_json["updated"], item_json["updated"]), item_json["published"]))
def remove_html_for_item(config, item_json, ignore_deleted=False):
# Delete a HTML file for a JSON object
html_path = config.html_root / item_json["html_path"]
if not ignore_deleted or html_path.exists():
html_path.unlink()
def remove_empty_html_directories(config):
# Remove empty directories in the HTML directory
html_root = config.html_root
removed_directories = set()
for (dirpath, dirnames, filenames) in html_root.walk(top_down=False):
if dirpath != html_root:
is_empty = len(filenames) == 0
if is_empty and len(dirnames) > 0: # some subdirectories may have been removed in an earlier iteration
for subdirname in dirnames:
if dirpath / subdirname not in removed_directories:
is_empty = False
break
if is_empty:
dirpath.rmdir()
removed_directories.add(dirpath)
def synchronize(config, client_session):
# Do a full feather update
mark_deleted_as_read(config, client_session)
synchronize_with_server(config, client_session)
remove_empty_html_directories(config)
def synchronize_local_changes(config, client_session):
# Upload local changes (read items) to the server
mark_deleted_as_read(config, client_session)
remove_empty_html_directories(config)
def synchronize_remote_changes(config, client_session):
# Download remote changes (new items, items read from another device) from the server
synchronize_with_server(config, client_session)
remove_empty_html_directories(config)
async def daemon_sync_up_loop(config, client_session):
while True:
synchronize_local_changes(config, client_session)
await asyncio.sleep(config.daemon_sync_up_every)
async def daemon_sync_down_loop(config, client_session):
while True:
synchronize_remote_changes(config, client_session)
await asyncio.sleep(config.daemon_sync_down_every)
async def daemon(config, client_session):
print(f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s")
async with asyncio.TaskGroup() as tg:
tup = tg.create_task(daemon_sync_up_loop(config, client_session))
tdown = tg.create_task(daemon_sync_down_loop(config, client_session))
def cancel_tasks():
tup.cancel()
tdown.cancel()
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks)
def regenerate_files(config):
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True) # path might change so we preemptively remove the old file
set_computed_fields_json(config, item_json) # recompute formatted datetime & path from the current configuration
# rewrite JSON
with json_path.open("w") as f:
json.dump(item_json, f)
# rewrite HTML
generate_html_for_item(config, item_json, regenerate=True)
def clear_data(config):
for json_path in config.json_root.glob("*.json"):
item_json = json.load(json_path.open("r"))
remove_html_for_item(config, item_json, ignore_deleted=True)
json_path.unlink()
remove_empty_html_directories(config)
#%% Run feather
def main():
parser = argparse.ArgumentParser(
prog="feather",
description="file-based RSS reader client"
)
parser.add_argument(
"action", choices=("sync", "sync-up", "sync-down", "daemon", "regenerate", "clear-data"),
help="sync: perform a full synchronization with the server; sync-up: only synchronize local changes to the server (e.g. items read locally); sync-down: only synchronize remote change from the server (e.g. new items or items read from another device); daemon: start in daemon mode (will keep performing synchronizations periodically until process is stopped); regenerate: regenerate all HTML files from the local data; clear-data: remove all local data"
)
args = parser.parse_args()
config = Config()
if args.action == "sync":
client_session = make_client_session(config)
synchronize(config, client_session)
elif args.action == "sync-up":
client_session = make_client_session(config)
synchronize_local_changes(config, client_session)
elif args.action == "sync-down":
client_session = make_client_session(config)
synchronize_remote_changes(config, client_session)
elif args.action == "daemon":
client_session = make_client_session(config)
try:
asyncio.run(daemon(config, client_session))
except KeyboardInterrupt:
pass
elif args.action == "regenerate":
regenerate_files(config)
elif args.action == "clear-data":
clear_data(config)
if __name__ == "__main__":
main()

View file

@ -1,696 +0,0 @@
"""
Taken from https://github.com/miniflux/google-reader (commit 4adba81).
Performed small modifications until TT-RSS/FreshAPI stopped complaining.
TODO: properly look into the spec to see who between FreshAPI and this library is wrong and PR
License:
MIT License
Copyright (c) 2025 Frédéric Guillot
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from dataclasses import dataclass
from typing import Literal
import requests
# Streams can be feeds, tags (folders) or system types.
STREAM_FEED = "feed/{feed_id}"
STREAM_TAG = "user/-/label/{label_title}"
STREAM_READ = "user/-/state/com.google/read"
STREAM_STARRED = "user/-/state/com.google/starred"
STREAM_KEPT_UNREAD = "user/-/state/com.google/kept-unread"
STREAM_BROADCAST = "user/-/state/com.google/broadcast"
STREAM_READING_LIST = "user/-/state/com.google/reading-list"
class ClientError(Exception):
"""Base class for Google Reader API errors."""
pass
class AuthenticationError(ClientError):
"""Raised when authentication fails."""
def __init__(self, message: str):
super().__init__(message)
class ResourceNotFoundError(ClientError):
"""Raised when a resource is not found."""
def __init__(self, message: str):
super().__init__(message)
@dataclass(frozen=True)
class AuthToken:
TokenType: str
AccessToken: str
@dataclass(frozen=True)
class UserInfo:
user_id: str
user_name: str
user_email: str
user_profile_id: str
@dataclass(frozen=True)
class Tag:
id: str
label: str | None = None
type: str | None = None
@dataclass(frozen=True)
class Subscription:
id: str
title: str
url: str
html_url: str
icon_url: str
categories: list[Tag]
@dataclass(frozen=True)
class ItemRef:
id: str
@dataclass(frozen=True)
class StreamIDs:
item_refs: list[ItemRef]
continuation: str | None
@dataclass(frozen=True)
class ContentHREF:
href: str
@dataclass(frozen=True)
class ContentHREFType:
href: str
type: str
@dataclass(frozen=True)
class ContentItemEnclosure:
url: str
type: str
@dataclass(frozen=True)
class ContentItemContent:
direction: str
content: str
@dataclass(frozen=True)
class ContentItemOrigin:
stream_id: str
title: str
html_url: str
@dataclass(frozen=True)
class ContentItem:
id: str
categories: list[str]
title: str
crawl_time_msec: str
timestamp_usec: str
published: int
updated: int
author: str
alternate: list[ContentHREFType]
summary: ContentItemContent
content: ContentItemContent
origin: ContentItemOrigin
enclosure: list[ContentItemEnclosure]
canonical: list[ContentHREF]
@dataclass(frozen=True)
class StreamContentItems:
direction: str
id: str
title: str
self: list[ContentHREF]
updated: int
items: list[ContentItem]
author: str
@dataclass(frozen=True)
class QuickAddSubscription:
query: str
num_results: int
stream_id: str
stream_name: str
class Client:
"""
Client for interacting with the Google Reader API.
"""
def __init__(
self, base_url: str, session: requests.Session | None = None, user_agent: str = "Google Reader Python Client"
):
"""
Initialize a new Google Reader API Client.
Args:
base_url: Base URL of the Miniflux instance (e.g., "https://reader.miniflux.app")
session: Optional requests.Session object for making HTTP requests.
user_agent: User agent string for the HTTP requests.
"""
self._base_url = base_url.rstrip("/")
self._session = session or requests.Session()
self._session.headers.update({"User-Agent": user_agent})
def login(self, username: str, password: str) -> AuthToken:
"""
Log in to the Google Reader API.
Args:
username: Username for the Google Reader account.
password: Password for the Google Reader account.
"""
response = self._session.post(
f"{self._base_url}/accounts/ClientLogin", data={"Email": username, "Passwd": password}
)
if response.status_code != 200:
raise AuthenticationError("Authentication failed")
auth_data = {}
for line in response.text.strip().split("\n"):
key, value = line.split("=", 1)
auth_data[key] = value
auth_token = auth_data.get("Auth")
if not auth_token:
raise AuthenticationError("No Auth token found in response")
return AuthToken(TokenType="GoogleLogin", AccessToken=auth_token)
def get_token(self, auth: AuthToken) -> str:
"""
Get the authentication token.
Args:
auth(AuthToken): Authentication token obtained from the login process.
Returns:
str: Authentication token.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.get(
f"{self._base_url}/reader/api/0/token",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to get token")
return response.text.strip()
def get_user_info(self, auth: AuthToken) -> UserInfo:
"""
Get user information from the Google Reader API.
Args:
auth(AuthToken): Authentication token obtained from the login process.
Returns:
UserInfo: User information object containing user ID, name, email, and profile ID.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.get(
f"{self._base_url}/reader/api/0/user-info",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to get user info")
user_info = response.json()
return UserInfo(
user_id=user_info.get("userId", ""),
user_name=user_info.get("userName", ""),
user_email=user_info.get("userEmail", ""),
user_profile_id=user_info.get("userProfileId", ""),
)
def list_subscriptions(self, auth: AuthToken) -> list[Subscription]:
"""
Get the list of subscriptions from the Google Reader API.
Args:
auth(AuthToken): Authentication token obtained from the login process.
Returns:
List of Subscription objects.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.get(
f"{self._base_url}/reader/api/0/subscription/list",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params={"output": "json"},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to get subscriptions")
return [
Subscription(
id=sub.get("id", ""),
title=sub.get("title", ""),
url=sub.get("url", ""),
html_url=sub.get("htmlUrl", ""),
icon_url=sub.get("iconUrl", ""),
categories=[Tag(**cat) for cat in sub.get("categories", [])],
)
for sub in response.json().get("subscriptions", [])
]
def edit_subscription(
self,
auth: AuthToken,
csrf_token: str,
subscription_id: str,
action: Literal["edit", "subscribe", "unsubscribe"],
remove_label_id: str | None = None,
add_label_id: str | None = None,
title: str | None = None,
) -> bool:
"""
Edit a subscription.
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
subscription_id(str): ID of the subscription to edit.
action(str): Action to perform on the subscription (edit, subscribe, unsubscribe).
remove_label_id(str): Label to remove from the subscription.
add_label_id(str): Label to add to the subscription.
title(str): New title for the subscription.
Returns:
bool: True if the operation was successful, False otherwise.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
data = {"s": subscription_id, "ac": action, "T": csrf_token}
if remove_label_id:
data["r"] = remove_label_id
if add_label_id:
data["a"] = add_label_id
if title:
data["t"] = title
response = self._session.post(
f"{self._base_url}/reader/api/0/subscription/edit",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
data=data,
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to edit subscription")
return True
def quick_add_subscription(self, auth: AuthToken, csrf_token: str, url: str) -> QuickAddSubscription:
"""
Quick add a subscription.
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
url(str): URL of the subscription to add.
Returns:
QuickAddSubscription: Object containing the result of the quick add operation.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.post(
f"{self._base_url}/reader/api/0/subscription/quickadd",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params={"output": "json"},
data={"quickadd": url, "T": csrf_token},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to quick add subscription")
response = response.json()
return QuickAddSubscription(
query=response.get("query", ""),
num_results=response.get("numResults", 0),
stream_id=response.get("streamId", ""),
stream_name=response.get("streamName", ""),
)
def get_stream_items_ids(
self,
auth: AuthToken,
stream_id: str,
limit: int = 1000,
direction: Literal["asc", "desc"] = "desc",
start_time: int | None = None,
continuation: str | None = None,
exclude_target: Literal["user/-/state/com.google/read"] | None = None,
include_target: Literal[
"user/-/state/com.google/read", "user/-/state/com.google/starred", "user/-/state/com.google/like"
]
| None = None,
) -> StreamIDs:
"""
Get item IDs for a given stream.
Args:
stream_id(str): ID of the stream to retrieve item IDs from.
limit(int): Maximum number of items to retrieve.
direction(Literal["asc", "desc"]): Direction to retrieve items (ascending or descending).
start_time(int | None): Optional start time for retrieving items.
continuation(str | None): Optional continuation token for pagination.
exclude_target(str | None): Optional target to exclude from results.
include_target(str | None): Optional target to include in results.
Returns:
List of item IDs.
"""
params = {"output": "json", "s": stream_id, "n": limit}
if direction == "asc":
params["r"] = "o"
if start_time:
params["ot"] = start_time
if exclude_target:
params["xt"] = exclude_target
if include_target:
params["it"] = include_target
if continuation:
params["c"] = continuation
response = self._session.get(
f"{self._base_url}/reader/api/0/stream/items/ids",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params=params,
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to get item IDs")
data = response.json()
return StreamIDs(
item_refs=[ItemRef(id=item["id"]) for item in data.get("itemRefs", [])],
continuation=data.get("continuation", ""),
)
def get_stream_items_contents(self, auth: AuthToken, csrf_token: str, item_ids: list[str]) -> StreamContentItems:
"""
Get the contents of items
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
item_ids(list[str]): List of item IDs to retrieve.
Returns:
StreamContentItems: List of item contents.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.post(
f"{self._base_url}/reader/api/0/stream/items/contents",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params={"output": "json"},
data={"i": item_ids, "T": csrf_token},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to get item contents")
data = response.json()
return StreamContentItems(
direction=data.get("direction", ""),
id=data.get("id", ""),
title=data.get("title", ""),
self=[ContentHREF(**item) for item in data.get("self", [])],
updated=data.get("updated", 0),
items=[
ContentItem(
id=item.get("id", ""),
categories=item.get("categories", []),
title=item.get("title", ""),
crawl_time_msec=item.get("crawlTimeMsec", ""),
timestamp_usec=item.get("timestampUsec", ""),
published=item.get("published", 0),
updated=item.get("updated", 0),
author=item.get("author", ""),
alternate=[
ContentHREFType(href=alt.get("href", ""), type=alt.get("type", ""))
for alt in item.get("alternate", [])
],
summary=ContentItemContent(
direction=item.get("summary", {}).get("direction", ""),
content=item.get("summary", {}).get("content", ""),
),
content=ContentItemContent(
direction=item.get("content", {}).get("direction", ""),
content=item.get("content", {}).get("content", ""),
),
origin=ContentItemOrigin(
stream_id=item.get("origin", {}).get("streamId", ""),
title=item.get("origin", {}).get("title", ""),
html_url=item.get("origin", {}).get("htmlUrl", ""),
),
enclosure=[],#ContentItemEnclosure(**enc) for enc in item.get("enclosure", [])],
canonical=[ContentHREF(**can) for can in item.get("canonical", [])],
)
for item in data.get("items", [])
],
author=data.get("author", ""),
)
def edit_tags(
self,
auth: AuthToken,
csrf_token: str,
item_ids: list[str],
add_tags: list[str] | None = None,
remove_tags: list[str] | None = None,
) -> bool:
"""
Edit tags for a list of items.
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
item_ids(list[str]): List of item IDs to edit tags for.
add_tags(list[str]): List of tags to add.
remove_tags(list[str]): List of tags to remove.
Returns:
bool: True if the operation was successful, False otherwise.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
data = {"i": item_ids, "T": csrf_token}
if add_tags:
data["a"] = add_tags
if remove_tags:
data["r"] = remove_tags
if not add_tags and not remove_tags:
raise ClientError("No tags to add or remove")
response = self._session.post(
f"{self._base_url}/reader/api/0/edit-tag",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params={"output": "json"},
data=data,
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to edit tags")
return True
def disable_tag(self, auth: AuthToken, csrf_token: str, tag_id: str) -> bool:
"""
Deletes a category or a tag.
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
tag_id(str): ID of the tag to delete.
Returns:
bool: True if the operation was successful, False otherwise.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.post(
f"{self._base_url}/reader/api/0/disable-tag",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params={"output": "json"},
data={"s": tag_id, "T": csrf_token},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to disable tags")
return True
def delete_tag(self, auth: AuthToken, csrf_token: str, tag_id: str) -> bool:
"""
Deletes a category or a tag.
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
tag_id(str): ID of the tag to delete.
Returns:
bool: True if the operation was successful, False otherwise.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
return self.disable_tag(auth, csrf_token, tag_id)
def rename_tag(self, auth: AuthToken, csrf_token: str, tag_id: str, new_label_name: str) -> bool:
"""
Rename a category or a tag.
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
tag_id(str): ID of the tag to rename.
new_label_name(str): New name for the category or tag.
Returns:
bool: True if the operation was successful, False otherwise.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.post(
f"{self._base_url}/reader/api/0/rename-tag",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params={"output": "json"},
data={"s": tag_id, "dest": get_label_id(new_label_name), "T": csrf_token},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to rename tags")
return True
def list_tags(self, auth: AuthToken) -> list[Tag]:
"""
Get the list of tags from the Google Reader API.
Args:
auth(AuthToken): Authentication token obtained from the login process.
Returns:
List of Tag objects.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
response = self._session.get(
f"{self._base_url}/reader/api/0/tag/list",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
params={"output": "json"},
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed")
elif response.status_code != 200:
raise ClientError("Failed to get tags")
return [Tag(**tag) for tag in response.json().get("tags", [])]
def mark_all_as_read(
self, auth: AuthToken, csrf_token: str, stream_id: str, before_timestamp: int | None = None
) -> bool:
"""
Mark all items in a stream as read.
Args:
auth(AuthToken): Authentication token obtained from the login process.
csrf_token(str): CSRF token for the request.
stream_id(str): ID of the stream to mark as read.
before_timestamp(int | None): Optional timestamp to mark items as read before this time.
Returns:
bool: True if the operation was successful, False otherwise.
Raises:
ClientError: If the request fails or the response is not valid.
AuthenticationError: If the authentication token is invalid.
"""
data = {"s": stream_id, "T": csrf_token}
if before_timestamp:
data["ts"] = str(before_timestamp)
response = self._session.post(
f"{self._base_url}/reader/api/0/mark-all-as-read",
headers={"Authorization": f"{auth.TokenType} auth={auth.AccessToken}"},
data=data,
)
match response.status_code:
case 401:
raise AuthenticationError("Authentication failed")
case 404:
raise ResourceNotFoundError("Stream not found")
case _ if response.status_code != 200:
raise ClientError("Failed to mark all as read")
return True
def get_long_item_id(item_id: int) -> str:
"""
Convert a short item ID to a long item ID.
Args:
item_id(int): Short item ID.
Returns:
Long item ID.
"""
return f"tag:google.com,2005:reader/item/{item_id:016x}"
def get_label_id(label_title: str) -> str:
"""
Convert a label to a label ID.
Args:
label_title(str): Label name.
Returns:
Label stream ID.
"""
return STREAM_TAG.format(label_title=label_title)

View file

@ -6,9 +6,18 @@ description = "file-based RSS reader client"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"google-reader>=0.0.3",
"jinja2>=3.1.6",
"requests>=2.32.5",
"ttrss-python>=0.5",
"tzdata>=2025.2",
]
license = "ISC"
license-files = [ "LICENSE" ]
[build-system]
requires = ["uv_build >= 0.9.0"]
build-backend = "uv_build"
[project.scripts]
feather = "feather.cli:main"

0
src/feather/__init__.py Normal file
View file

200
src/feather/articledata.py Normal file
View file

@ -0,0 +1,200 @@
"""Article representation and storage on disk"""
from __future__ import annotations
import os
import json
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from hashlib import sha256
from feather.config import Config
def escape_filename(config, filename):
return filename.translate(config.filename_translation)
def truncate_filename(config, filename):
max_filename_length = config.max_filename_length
filename_utf8 = filename.encode("utf-8")
if len(filename_utf8) <= max_filename_length:
return filename
else:
suffix = Path(filename).suffix
max_basename_length = max_filename_length - len(suffix.encode("utf-8"))
cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore"))
return filename[:cutoff] + '' + suffix
def format_datetime(config, timestamp):
return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format)
type CategoryId = int | str
class Category:
id: CategoryId # category id
title: str # category name
parents: list[Category] # list of parent categories
order: int = 0 # category display order, starting from 1 (0 if unknown)
def fromdict(d):
parents = [ Category.fromdict(parent) for parent in d["parents"] ]
return Category(d["id"], d["title"], parents, d["order"])
def __init__(self, id, title, parents=[], order=0):
self.id = id
self.title = title
self.parents = parents
self.order = order
def asdict(self):
return {
"id": self.id,
"title": self.title,
"parents": [ dir.asdict() for dir in self.parents ],
"order": self.order
}
type ArticleId = int | str
class Article(ABC):
config: Config
json_path: Path
html_path: str
id: ArticleId # article id
title: str = "" # article title
published: int = 0 # article publication time (timestamp)
published_formatted: str # article publication time (text)
updated: int = 0 # article update time (timestamp)
updated_formatted: str # article publication time (text)
author: str = "" # article author
summary: str = "" # article summary (HTML)
content: str = "" # article content (HTML)
feed_title: str = "" # feed title
feed_url: str = "" # feed URL
feed_icon_url: str = "" # feed icon URL
feed_order: int = 0 # feed display order, starting from 1 (0 if unknown)
article_url: str = "" # article URL
comments_url: str = "" # article comments URL
language: str = "" # article language
image_url: str = "" # article main image
category: Category # feed category
def get_html_path(self):
config = self.config
category_directory = config.html_root
for category in self.category.parents:
category_directory /= escape_filename(config, config.item_category_template.render(category.asdict()))
category_directory /= escape_filename(config, config.item_category_template.render(self.category.asdict()))
html_name = truncate_filename(config, escape_filename(config, config.item_filename_template.render(self.get_template_dict())))
return category_directory / html_name
def compute_fields(self):
config = self.config
self.updated_formatted = format_datetime(config, self.updated)
self.published_formatted = format_datetime(config, self.published)
self.json_path = config.json_root / f"{ sha256(str(self.id).encode("utf-8")).hexdigest() }.json"
self.html_path = str(self.get_html_path().relative_to(config.html_root)) # TODO: do this dynamically on write, handle overwrite conflict at the same time
def get_template_dict(self) -> dict:
template_fields = ("id", "title", "published", "published_formatted", "updated", "updated_formatted", "author", "summary", "content", "feed_title", "feed_url", "feed_icon_url", "feed_order", "article_url", "comments_url", "language", "image_url")
d = { field: getattr(self, field) for field in template_fields }
d["category"] = self.category.asdict()
return d
def write_json(self):
stored_fields = ("id", "title", "published", "published_formatted", "updated", "updated_formatted", "author", "summary", "content", "feed_title", "feed_url", "feed_icon_url", "feed_order", "article_url", "comments_url", "language", "image_url", "html_path")
item_json = { field: getattr(self, field) for field in stored_fields }
item_json["category"] = self.category.asdict()
if self.json_path.exists():
raise Exception
with self.json_path.open("w") as f:
json.dump(item_json, f)
def delete_json(self):
self.json_path.unlink()
def write_html(self):
# Write HTML file for a JSON object
config = self.config
html_path = config.html_root / self.html_path
if html_path.exists(): # TODO: does this actually matter
print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.")
else:
html_path.parent.mkdir(parents=True, exist_ok=True)
with html_path.open("w") as f:
f.write(config.item_template.render(self.get_template_dict()))
# set accessed date to update time, modified to publication time
os.utime(html_path, (max(self.updated, self.updated), self.published))
def delete_html(self, ignore_deleted=False):
# Delete a HTML file for a JSON object
html_path = self.config.html_root / self.html_path
if not ignore_deleted or html_path.exists():
html_path.unlink()
def write(self):
self.write_json()
self.write_html()
def delete(self):
self.delete_html(ignore_deleted=True)
self.delete_json()
def regenerate(self):
self.delete() # paths might change so we preemptively remove the old file
self.compute_fields() # recompute formatted datetime & paths from the current configuration
self.write() # rewrite JSON & HTML
class GReaderArticle(Article):
def __init__(self, session: GReaderSession, category: Category, item_content):
self.config = session.config
self.category = category
self.id = item_content.id
self.title = item_content.title
self.published = item_content.published
self.updated = item_content.updated
self.author = item_content.author
self.summary = item_content.summary.content
self.content = item_content.content.content
self.feed_title = item_content.origin.title
self.feed_url = item_content.origin.html_url
self.article_url = item_content.canonical[0].href
self.compute_fields()
class TTRArticle(Article):
def __init__(self, session: TRRSession, category: Category, article):
self.config = session.config
self.category = category
self.id = article.id
self.title = article.title
self.published = article.updated.timestamp()
self.updated = article.updated.timestamp()
self.author = article.author
self.summary = article.excerpt
self.content = article.content
self.feed_title = article.feed_title
self.feed_url = article.site_url
self.feed_icon_url = session.feeds[article.feed_id]["icon"]
self.feed_order = session.feeds[article.feed_id]["order"]
self.article_url = article.link
self.comments_url = article.comments_link
self.language = article.lang
self.image_url = article.flavor_image
self.compute_fields()
class FileArticle(Article):
def __init__(self, config: Config, json_path: Path) -> Article:
self.config = config
self.json_path = json_path
item_json = json.load(json_path.open("r"))
for field in item_json:
setattr(self, field, item_json[field])
self.category = Category.fromdict(item_json["category"])

41
src/feather/cli.py Normal file
View file

@ -0,0 +1,41 @@
#!/usr/bin/python3
"""Commandline interface to feather"""
import argparse
import asyncio
from feather.config import Config
from feather.feather import FeatherApp
def main():
parser = argparse.ArgumentParser(
prog="feather",
description="file-based RSS reader client"
)
parser.add_argument(
"action", choices=("sync", "sync-up", "sync-down", "daemon", "regenerate", "clear-data"),
help="sync: perform a full synchronization with the server; sync-up: only synchronize local changes to the server (e.g. items read locally); sync-down: only synchronize remote change from the server (e.g. new items or items read from another device); daemon: start in daemon mode (will keep performing synchronizations periodically until process is stopped); regenerate: regenerate all HTML files from the local data; clear-data: remove all local data"
)
args = parser.parse_args()
config = Config()
app = FeatherApp(config)
if args.action == "sync":
app.synchronize()
elif args.action == "sync-up":
app.synchronize_local_changes()
elif args.action == "sync-down":
app.synchronize_remote_changes()
elif args.action == "daemon":
try:
asyncio.run(app.daemon())
except KeyboardInterrupt:
pass
elif args.action == "regenerate":
app.regenerate_files()
elif args.action == "clear-data":
app.clear_data()
if __name__ == "__main__":
main()

View file

@ -14,8 +14,9 @@ password = "password"
# How many items to retrieve at most from the server in a single request. Lower values will make synchronization slower, higher values might make the server complain.
# If you are using the Google Reader API: servers should be okay with up to 1000.
# If you are using the ttrss API: servers should be okay with up to 200.
# Set to 0 to let feather choose.
# Can be set through the environment variable SERVER_ITEMS_PER_REQUEST.
items_per_request = 500
items_per_request = 0
[directories]
# Directory path where the internal feather data will be stored.

71
src/feather/config.py Normal file
View file

@ -0,0 +1,71 @@
"""Feather configuration file"""
import os
import tomllib
from zoneinfo import ZoneInfo
from pathlib import Path
from jinja2 import Template
class ConfigurationError(ValueError):
pass
default_config_path = Path(__file__).parent / "config.default.toml"
class Config:
def __init__(self):
with default_config_path.open("rb") as f:
default_config = tomllib.load(f)
config_path = Path(os.environ.get("CONFIG_PATH") or "config.toml")
if config_path.exists():
with config_path.open("rb") as f:
config = tomllib.load(f)
elif "CONFIG_PATH" in os.environ:
raise ConfigurationError(f"configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path")
else:
config = {}
def get_config(category, field, can_default=True):
env_name = f"{category.upper()}_{field.upper()}"
c = config.get(category, {})
if env_name in os.environ:
return os.environ[env_name]
elif field in c:
return c[field]
elif can_default:
return default_config[category][field]
else:
raise ConfigurationError(f"{category}.{field} required but not found in configuration file {config_path} nor in environment variable {env_name}")
# Get config fields
self.html_root: Path = Path(get_config("directories", "reader"))
self.json_root: Path = Path(get_config("directories", "data"))
self.server_api: str = str(get_config("server", "api"))
if self.server_api not in ("googlereader", "ttrss"):
raise ConfigurationError(f"server.api must be either ttrss or googlereader")
self.server_url: str = str(get_config("server", "url", False))
self.server_user: str = str(get_config("server", "user", False))
self.server_password: str = str(get_config("server", "password", False))
self.items_per_query: int = int(get_config("server", "items_per_request"))
if self.items_per_query == 0:
self.items_per_query = 1000 if self.server_api == "googlereader" else 200
self.timezone: ZoneInfo = ZoneInfo(str(get_config("datetime", "timezone")))
self.time_format: str = str(get_config("datetime", "format"))
self.item_template: Template = Template(str(get_config("html", "template")), autoescape=True)
self.item_filename_template: Template = Template(str(get_config("html", "filename_template")), autoescape=False)
self.item_category_template: Template = Template(str(get_config("html", "category_template")), autoescape=False)
self.max_filename_length: int = int(get_config("html", "max_filename_length"))
self.filename_translation = str.maketrans(get_config("html", "filename_replacement"))
self.daemon_sync_up_every: int = int(get_config("daemon", "sync_up_every"))
self.daemon_sync_down_every: int = int(get_config("daemon", "sync_down_every"))
# Computed config fields
self.update_lock: Path = self.json_root / "update.lock"
# Create missing directories
self.html_root.mkdir(exist_ok=True)
self.json_root.mkdir(exist_ok=True)

166
src/feather/feather.py Executable file
View file

@ -0,0 +1,166 @@
"""Main feather application"""
import asyncio
import signal
from feather.config import Config
from feather.feedreaderclient import GReaderSession, TTRSession, ClientSession
from feather.articledata import FileArticle
class FeatherApp:
config: Config
def __init__(self, config: Config):
self.config = config
self._client_session = None
_client_session: ClientSession
def get_client_session(self) -> ClientSession:
"""Connect to the server and return a ClientSession object; return an existing ClientSession if we are already connected"""
if not self._client_session:
config = self.config
api = config.server_api
if api == "googlereader":
self._client_session = GReaderSession(config)
elif api == "ttrss":
self._client_session = TTRSession(config)
else:
raise ValueError(f"{api} server type is invalid; must be ttrss or googlereader")
return self._client_session
def remove_empty_html_directories(self):
"""Remove empty directories in the HTML directory"""
config = self.config
html_root = config.html_root
removed_directories = set()
for (dirpath, dirnames, filenames) in html_root.walk(top_down=False):
if dirpath != html_root:
is_empty = len(filenames) == 0
if is_empty and len(dirnames) > 0: # some subdirectories may have been removed in an earlier iteration
for subdirname in dirnames:
if dirpath / subdirname not in removed_directories:
is_empty = False
break
if is_empty:
dirpath.rmdir()
removed_directories.add(dirpath)
def mark_deleted_as_read(self):
"""Mark items that are in the JSON directory but with missing HTML file as read on the server"""
config = self.config
client_session = self.get_client_session()
if config.update_lock.exists():
print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage")
return
marked_as_read = 0
to_mark_as_read = []
for json_path in config.json_root.glob("*.json"):
article = FileArticle(config, json_path)
html_path = config.html_root / article.html_path
if not html_path.exists():
to_mark_as_read.append(article.id)
article.delete()
marked_as_read += 1
for i in range(0, len(to_mark_as_read), config.items_per_query):
client_session.mark_as_read(to_mark_as_read[i:i+config.items_per_query])
print(f"Marked {marked_as_read} items as read")
def synchronize_with_server(self):
"""Synchronize items from the server, generating and deleting JSON and HTML files accordingly"""
config = self.config
client_session = self.get_client_session()
config.update_lock.touch()
print("Synchronizing with server...")
new_items, updated_items = 0, 0
grabbed_item_paths = set()
categories = client_session.list_categories()
for category in categories:
print(f" Updating category {category.title}")
remaining, continuation = True, 0
while remaining:
articles = client_session.get_unread_articles_in_category(category, limit=config.items_per_query, continuation=continuation)
if len(articles) >= config.items_per_query:
continuation += len(articles)
else:
remaining = False
for item in articles:
json_path = item.json_path
grabbed_item_paths.add(json_path)
if not json_path.exists():
item.write()
new_items += 1
else:
old_item = FileArticle(config, json_path)
if item.updated > old_item.updated:
old_item.delete()
item.write()
updated_items += 1
# Remove items that we didn't get from the server but are in the JSON directory
removed_items = 0
for item_path in config.json_root.glob("*.json"):
if not item_path in grabbed_item_paths:
FileArticle(config, item_path).delete()
removed_items += 1
print(f"Synchronization successful ({new_items} new items, {updated_items} updated, {removed_items} removed)")
config.update_lock.unlink()
def synchronize(self):
"""Do a full feather update"""
self.mark_deleted_as_read()
self.synchronize_with_server()
self.remove_empty_html_directories()
def synchronize_local_changes(self):
"""Upload local changes (read items) to the server"""
self.mark_deleted_as_read()
self.remove_empty_html_directories()
def synchronize_remote_changes(self):
"""Download remote changes (new items, items read from another device) from the server"""
self.synchronize_with_server()
self.remove_empty_html_directories()
async def daemon_sync_up_loop(self):
while True:
self.synchronize_local_changes()
await asyncio.sleep(self.config.daemon_sync_up_every)
async def daemon_sync_down_loop(self):
while True:
self.synchronize_remote_changes()
await asyncio.sleep(self.config.daemon_sync_down_every)
async def daemon(self):
"""Start the synchronization daemon"""
config = self.config
print(f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s")
async with asyncio.TaskGroup() as tg:
tup = tg.create_task(self.daemon_sync_up_loop())
tdown = tg.create_task(self.daemon_sync_down_loop())
def cancel_tasks():
tup.cancel()
tdown.cancel()
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks)
def regenerate_files(self):
"""Regenerate all local files using local data only"""
config = self.config
for json_path in config.json_root.glob("*.json"):
FileArticle(config, json_path).regenerate()
def clear_data(self):
"""Delete all local data"""
config = self.config
for json_path in config.json_root.glob("*.json"):
FileArticle(config, json_path).delete()
self.remove_empty_html_directories()

View file

@ -0,0 +1,98 @@
"""Connection between the remote server and feather"""
import re
from abc import ABC, abstractmethod
from ttrss.client import TTRClient
import google_reader
from feather.config import Config
from feather.articledata import Article, GReaderArticle, TTRArticle, ArticleId, Category, CategoryId
class ClientSession(ABC):
config: Config
@abstractmethod
def mark_as_read(self, item_ids: list[ArticleId]):
"""Mark all the given articles as read."""
pass
@abstractmethod
def list_categories(self) -> list[Category]:
"""Returns a list of all the categories on the server."""
pass
@abstractmethod
def get_unread_articles_in_category(self, category_id: CategoryId, limit: int, continuation: int=0) -> list[Article]:
"""Returns a list of Articles in the given category. limit and continuation are required for pagination."""
pass
label_name = re.compile("user/.*/label/(.*)")
class GReaderSession(ClientSession):
"""Google Reader API client"""
greader: google_reader.Client
auth_token: str
csrf_token: str
def __init__(self, config: Config):
self.config = config
self.greader = google_reader.Client(config.server_url)
self.auth_token = self.greader.login(config.server_user, config.server_password)
self.csrf_token = self.greader.get_token(self.auth_token)
def mark_as_read(self, item_ids: list[ArticleId]):
self.greader.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ])
def list_categories(self) -> list[Category]:
categories = [tag for tag in self.greader.list_tags(self.auth_token) if tag.type == "folder"]
l = []
for category in categories:
category_name = category.label or label_name.search(category.id).group(1)
category_id = category.id
l.append(Category(id=category_id, title=category_name))
return l
def get_unread_articles_in_category(self, category, limit=500, continuation=0) -> list[GReaderArticle]:
items_ids = self.greader.get_stream_items_ids(self.auth_token, stream_id=category.id, exclude_target="user/-/state/com.google/read", limit=limit, continuation=continuation)
item_contents = self.greader.get_stream_items_contents(self.auth_token, self.csrf_token, item_ids=[item.id for item in items_ids.item_refs])
return [ GReaderArticle(self, category, item_content) for item_content in item_contents.items ]
class TTRSession(ClientSession):
"""Tiny Tiny RSS API client"""
ttrss: TTRClient
feeds: dict
def __init__(self, config: Config):
self.config = config
self.ttrss = TTRClient(config.server_url, config.server_user, config.server_password, auto_login=True)
self.ttrss.login()
self.feeds = {}
def mark_as_read(self, item_ids: list[ArticleId]):
self.ttrss.mark_read(item_ids)
def list_categories(self) -> list[Category]:
self.feeds = {}
def get_categories_recursive(parent_category, parent_categories=[]):
categories = []
index = 1
for item in parent_category["items"]:
# skip special categories and feeds
if item["bare_id"] <= 0:
continue
# category
elif item.get("type") == "category":
category = Category(id=item["bare_id"], parents=parent_categories, title=item["name"], order=index)
categories.append(category)
categories += get_categories_recursive(item, parent_categories+[category])
# feeds
elif "type" not in item:
self.feeds[item["bare_id"]] = item
self.feeds[item["bare_id"]]["order"] = index
index += 1
return categories
tree = self.ttrss.get_feed_tree()
return get_categories_recursive(tree["categories"])
def get_unread_articles_in_category(self, category, limit=100, continuation=0) -> list[TTRArticle]:
headlines = self.ttrss.get_headlines(feed_id=category.id, limit=limit, skip=continuation, is_cat=True, show_excerpt=True, show_content=True, view_mode="unread", include_attachments=False, include_nested=False)
return [ TTRArticle(self, category, headline) for headline in headlines ]

27
uv.lock generated
View file

@ -56,18 +56,34 @@ wheels = [
[[package]]
name = "feather"
version = "0.1.0"
source = { virtual = "." }
source = { editable = "." }
dependencies = [
{ name = "google-reader" },
{ name = "jinja2" },
{ name = "requests" },
{ name = "ttrss-python" },
{ name = "tzdata" },
]
[package.metadata]
requires-dist = [
{ name = "google-reader", specifier = ">=0.0.3" },
{ name = "jinja2", specifier = ">=3.1.6" },
{ name = "requests", specifier = ">=2.32.5" },
{ name = "ttrss-python", specifier = ">=0.5" },
{ name = "tzdata", specifier = ">=2025.2" },
]
[[package]]
name = "google-reader"
version = "0.0.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "requests" },
]
sdist = { url = "https://files.pythonhosted.org/packages/37/0e/72617daa38fa0eaa5ad6e9e6ba25a6dd1186595fbd19e65b70104c799811/google_reader-0.0.3.tar.gz", hash = "sha256:397d65d772d353f0be3137b05f4a4d0ca5a4d5c0545456769f495e37c0178629", size = 10110, upload-time = "2025-05-05T04:33:16.838Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/32/ae/2750424c1ca9d3be30036ed7091e8288bc9754450719f1000067f7c84117/google_reader-0.0.3-py3-none-any.whl", hash = "sha256:7747c1b48e72a3b988364211c13e7f5d30655023102ef64e5177f532851a1436", size = 10076, upload-time = "2025-05-05T04:33:15.334Z" },
]
[[package]]
@ -178,6 +194,15 @@ dependencies = [
]
sdist = { url = "https://files.pythonhosted.org/packages/a2/72/786e2edf469d6d1e048f3dd043a50ececf7674d10402d703d1297bb6e102/ttrss-python-0.5.tar.gz", hash = "sha256:ad7816b85e3c0b13822f321f91ed7f19dc3b82237f2d7838c2dcb9aac0f4ca07", size = 6247, upload-time = "2015-09-02T08:53:06.221Z" }
[[package]]
name = "tzdata"
version = "2025.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9", size = 196380, upload-time = "2025-03-23T13:54:43.652Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" },
]
[[package]]
name = "urllib3"
version = "2.5.0"