1
0
Fork 0
mirror of https://codeberg.org/Reuh/feather.git synced 2025-10-27 10:09:32 +00:00

feat: make ruff happy

This commit is contained in:
Étienne Fildadut 2025-10-11 15:36:11 +02:00
parent 00001ed4b0
commit 4438c48631
5 changed files with 276 additions and 93 deletions

View file

@ -8,11 +8,12 @@ import textwrap
from feather.config import Config
from feather.feather import FeatherApp
def main():
parser = argparse.ArgumentParser(
prog="feather",
description="file-based RSS reader client",
formatter_class=argparse.RawTextHelpFormatter
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"action",
@ -47,5 +48,6 @@ def main():
elif args.action == "clear-data":
app.clear_data()
if __name__ == "__main__":
main()

View file

@ -10,6 +10,7 @@ import google_reader
from feather.config import Config
from feather.data import Article, ArticleId, Category
class ClientSession(ABC):
config: Config
@ -24,13 +25,23 @@ class ClientSession(ABC):
pass
@abstractmethod
def get_articles_in_category(self, category: Category, limit: int, continuation: int = 0, unread_only: bool = False) -> list[Article]:
def get_articles_in_category(
self,
category: Category,
limit: int,
continuation: int = 0,
unread_only: bool = False,
) -> list[Article]:
"""Returns a list of Articles in the given category. limit and continuation are required for pagination."""
pass
label_name = re.compile("user/.*/label/(.*)")
class GReaderSession(ClientSession):
"""Google Reader API client"""
greader: google_reader.Client
auth_token: str
csrf_token: str
@ -43,20 +54,40 @@ class GReaderSession(ClientSession):
def set_read_flag(self, article_ids: list[ArticleId], read: bool = True):
if read:
self.greader.edit_tags(self.auth_token, self.csrf_token, item_ids=article_ids, add_tags=[google_reader.STREAM_READ])
self.greader.edit_tags(
self.auth_token,
self.csrf_token,
item_ids=article_ids,
add_tags=[google_reader.STREAM_READ],
)
else:
self.greader.edit_tags(self.auth_token, self.csrf_token, item_ids=article_ids, remove_tags=[google_reader.STREAM_READ])
self.greader.edit_tags(
self.auth_token,
self.csrf_token,
item_ids=article_ids,
remove_tags=[google_reader.STREAM_READ],
)
def list_categories(self) -> list[Category]:
categories = [tag for tag in self.greader.list_tags(self.auth_token) if tag.type == "folder"]
l = []
for category in categories:
tags = [
tag
for tag in self.greader.list_tags(self.auth_token)
if tag.type == "folder"
]
categories = []
for category in tags:
category_name = category.label or label_name.search(category.id).group(1)
category_id = category.id
l.append(Category(id=category_id, title=category_name))
return l
categories.append(Category(id=category_id, title=category_name))
return categories
def get_articles_in_category(self, category: Category, limit: int = 1000, continuation: int = 0, unread_only: bool = False) -> list[GReaderArticle]:
def get_articles_in_category(
self,
category: Category,
limit: int = 1000,
continuation: int = 0,
unread_only: bool = False,
) -> list[GReaderArticle]:
item_ids = [
item.id
for item in self.greader.get_stream_items_ids(
@ -70,8 +101,14 @@ class GReaderSession(ClientSession):
if len(item_ids) == 0:
return []
else:
item_contents = self.greader.get_stream_items_contents(self.auth_token, self.csrf_token, item_ids=item_ids)
return [ GReaderArticle(self, category, item_content) for item_content in item_contents.items ]
item_contents = self.greader.get_stream_items_contents(
self.auth_token, self.csrf_token, item_ids=item_ids
)
return [
GReaderArticle(self, category, item_content)
for item_content in item_contents.items
]
class GReaderArticle(Article):
def __init__(self, session: GReaderSession, category: Category, item_content):
@ -93,14 +130,21 @@ class GReaderArticle(Article):
self.compute_fields()
class TTRSession(ClientSession):
"""Tiny Tiny RSS API client"""
ttrss: TTRClient
feeds: dict
def __init__(self, config: Config):
self.config = config
self.ttrss = TTRClient(config.server_url, config.server_user, config.server_password, auto_login=True)
self.ttrss = TTRClient(
config.server_url,
config.server_user,
config.server_password,
auto_login=True,
)
self.ttrss.login()
self.feeds = {}
@ -112,6 +156,7 @@ class TTRSession(ClientSession):
def list_categories(self) -> list[Category]:
self.feeds = {}
def get_categories_recursive(parent_category, parent_categories=[]):
categories = []
index = 1
@ -121,19 +166,33 @@ class TTRSession(ClientSession):
continue
# category
elif item.get("type") == "category":
category = Category(id=item["bare_id"], parents=parent_categories, title=item["name"], order=index)
category = Category(
id=item["bare_id"],
parents=parent_categories,
title=item["name"],
order=index,
)
categories.append(category)
categories += get_categories_recursive(item, parent_categories+[category])
categories += get_categories_recursive(
item, parent_categories + [category]
)
# feeds
elif "type" not in item:
self.feeds[item["bare_id"]] = item
self.feeds[item["bare_id"]]["order"] = index
index += 1
return categories
tree = self.ttrss.get_feed_tree()
return get_categories_recursive(tree["categories"])
def get_articles_in_category(self, category: Category, limit: int = 200, continuation: int = 0, unread_only: bool = False) -> list[TTRArticle]:
def get_articles_in_category(
self,
category: Category,
limit: int = 200,
continuation: int = 0,
unread_only: bool = False,
) -> list[TTRArticle]:
headlines = self.ttrss.get_headlines(
feed_id=category.id,
limit=limit,
@ -145,7 +204,8 @@ class TTRSession(ClientSession):
include_attachments=False,
include_nested=False,
)
return [ TTRArticle(self, category, headline) for headline in headlines ]
return [TTRArticle(self, category, headline) for headline in headlines]
class TTRArticle(Article):
def __init__(self, session: TTRSession, category: Category, article):

View file

@ -6,11 +6,14 @@ from zoneinfo import ZoneInfo
from pathlib import Path
from jinja2 import Template
class ConfigurationError(ValueError):
pass
default_config_path = Path(__file__).parent / "config.default.toml"
class Config:
def __init__(self):
with default_config_path.open("rb") as f:
@ -21,7 +24,9 @@ class Config:
with config_path.open("rb") as f:
config = tomllib.load(f)
elif "CONFIG_PATH" in os.environ:
raise ConfigurationError(f"configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path")
raise ConfigurationError(
f"configuration file {config_path} does not exist; create it or change the CONFIG_PATH environment variable to another path"
)
else:
config = {}
@ -35,7 +40,9 @@ class Config:
elif can_default:
return default_config[category][field]
else:
raise ConfigurationError(f"{category}.{field} required but not found in configuration file {config_path} nor in environment variable {env_name}")
raise ConfigurationError(
f"{category}.{field} required but not found in configuration file {config_path} nor in environment variable {env_name}"
)
# Get config fields
self.html_root: Path = Path(get_config("directories", "reader"))
@ -43,24 +50,38 @@ class Config:
self.server_api: str = str(get_config("server", "api"))
if self.server_api not in ("googlereader", "ttrss"):
raise ConfigurationError(f"server.api must be either ttrss or googlereader, not {self.server_api}")
raise ConfigurationError(
f"server.api must be either ttrss or googlereader, not {self.server_api}"
)
self.server_url: str = str(get_config("server", "url", False))
self.server_user: str = str(get_config("server", "user", False))
self.server_password: str = str(get_config("server", "password", False))
self.articles_per_query: int = int(get_config("server", "articles_per_request"))
if self.articles_per_query == 0:
self.articles_per_query = 1000 if self.server_api == "googlereader" else 200
self.only_sync_unread_articles: bool = bool(get_config("server", "only_sync_unread_articles"))
self.only_sync_unread_articles: bool = bool(
get_config("server", "only_sync_unread_articles")
)
self.timezone: ZoneInfo = ZoneInfo(str(get_config("datetime", "timezone")))
self.time_format: str = str(get_config("datetime", "format"))
self.article_template: Template = Template(str(get_config("html", "article_template")), autoescape=True)
self.article_filename_template: Template = Template(str(get_config("html", "filename_template")), autoescape=False)
self.article_category_template: Template = Template(str(get_config("html", "category_template")), autoescape=False)
self.hide_empty_categories: bool = bool(get_config("html", "hide_empty_categories"))
self.article_template: Template = Template(
str(get_config("html", "article_template")), autoescape=True
)
self.article_filename_template: Template = Template(
str(get_config("html", "filename_template")), autoescape=False
)
self.article_category_template: Template = Template(
str(get_config("html", "category_template")), autoescape=False
)
self.hide_empty_categories: bool = bool(
get_config("html", "hide_empty_categories")
)
self.max_filename_length: int = int(get_config("html", "max_filename_length"))
self.filename_translation = str.maketrans(get_config("html", "filename_replacement"))
self.filename_translation = str.maketrans(
get_config("html", "filename_replacement")
)
self.daemon_sync_up_every: int = int(get_config("daemon", "sync_up_every"))
self.daemon_sync_down_every: int = int(get_config("daemon", "sync_down_every"))

View file

@ -11,9 +11,11 @@ from hashlib import sha256
from feather.config import Config
def escape_filename(config, filename):
return filename.translate(config.filename_translation)
def truncate_filename(config, filename):
max_filename_length = config.max_filename_length
filename_utf8 = filename.encode("utf-8")
@ -22,14 +24,23 @@ def truncate_filename(config, filename):
else:
suffix = Path(filename).suffix
max_basename_length = max_filename_length - len(suffix.encode("utf-8"))
cutoff = len(filename.encode('utf-8')[:max_basename_length].decode('utf-8', errors="ignore"))
return filename[:cutoff] + '' + suffix
cutoff = len(
filename.encode("utf-8")[:max_basename_length].decode(
"utf-8", errors="ignore"
)
)
return filename[:cutoff] + "" + suffix
def format_datetime(config, timestamp):
return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format)
return datetime.fromtimestamp(timestamp, config.timezone).strftime(
config.time_format
)
type CategoryId = int | str
class Category:
id: CategoryId # category id
title: str # category name
@ -37,7 +48,7 @@ class Category:
order: int = 0 # category display order, starting from 1 (0 if unknown)
def fromdict(d):
parents = [ Category.fromdict(parent) for parent in d["parents"] ]
parents = [Category.fromdict(parent) for parent in d["parents"]]
return Category(d["id"], d["title"], parents, d["order"])
def __init__(self, id, title, parents=[], order=0):
@ -50,12 +61,14 @@ class Category:
return {
"id": self.id,
"title": self.title,
"parents": [ dir.asdict() for dir in self.parents ],
"order": self.order
"parents": [dir.asdict() for dir in self.parents],
"order": self.order,
}
type ArticleId = int | str
class Article(ABC):
config: Config
json_path: Path
@ -90,10 +103,20 @@ class Article(ABC):
config = self.config
category_directory = config.html_root
for category in self.category.parents:
category_directory /= escape_filename(config, config.article_category_template.render(category.asdict()))
category_directory /= escape_filename(config, config.article_category_template.render(self.category.asdict()))
category_directory /= escape_filename(
config, config.article_category_template.render(category.asdict())
)
category_directory /= escape_filename(
config, config.article_category_template.render(self.category.asdict())
)
html_name = truncate_filename(config, escape_filename(config, config.article_filename_template.render(self.get_template_dict())))
html_name = truncate_filename(
config,
escape_filename(
config,
config.article_filename_template.render(self.get_template_dict()),
),
)
return category_directory / html_name
@ -101,23 +124,70 @@ class Article(ABC):
config = self.config
self.updated_formatted = format_datetime(config, self.updated)
self.published_formatted = format_datetime(config, self.published)
self.json_path = config.json_root / f"{ sha256(str(self.id).encode("utf-8")).hexdigest() }.json"
self.html_path = str(self.get_html_path().relative_to(config.html_root)) # TODO: do this dynamically on write, handle overwrite conflict at the same time
self.json_path = (
config.json_root
/ f"{sha256(str(self.id).encode('utf-8')).hexdigest()}.json"
)
self.html_path = str(
self.get_html_path().relative_to(config.html_root)
) # TODO: do this dynamically on write, handle overwrite conflict at the same time
def get_template_dict(self) -> dict:
template_fields = ("id", "unread", "title", "published", "published_formatted", "updated", "updated_formatted", "author", "summary", "content", "feed_title", "feed_url", "feed_icon_url", "feed_order", "article_url", "comments_url", "language", "image_url")
d = { field: getattr(self, field) for field in template_fields }
template_fields = (
"id",
"unread",
"title",
"published",
"published_formatted",
"updated",
"updated_formatted",
"author",
"summary",
"content",
"feed_title",
"feed_url",
"feed_icon_url",
"feed_order",
"article_url",
"comments_url",
"language",
"image_url",
)
d = {field: getattr(self, field) for field in template_fields}
d["category"] = self.category.asdict()
return d
def write_json(self):
stored_fields = ("id", "unread", "title", "published", "published_formatted", "updated", "updated_formatted", "author", "summary", "content", "feed_title", "feed_url", "feed_icon_url", "feed_order", "article_url", "comments_url", "language", "image_url", "html_path")
article_json = { field: getattr(self, field) for field in stored_fields }
stored_fields = (
"id",
"unread",
"title",
"published",
"published_formatted",
"updated",
"updated_formatted",
"author",
"summary",
"content",
"feed_title",
"feed_url",
"feed_icon_url",
"feed_order",
"article_url",
"comments_url",
"language",
"image_url",
"html_path",
)
article_json = {field: getattr(self, field) for field in stored_fields}
article_json["category"] = self.category.asdict()
if self.json_path.exists():
raise Exception(f"Unexpectedly tried to overwrite article data for {self.json_path}")
raise Exception(
f"Unexpectedly tried to overwrite article data for {self.json_path}"
)
with self.json_path.open("w") as f:
json.dump(article_json, f)
def delete_json(self):
self.json_path.unlink()
@ -126,13 +196,16 @@ class Article(ABC):
config = self.config
html_path = config.html_root / self.html_path
if html_path.exists(): # TODO: does this actually matter
print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.")
print(
f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong."
)
else:
html_path.parent.mkdir(parents=True, exist_ok=True)
with html_path.open("w") as f:
f.write(config.article_template.render(self.get_template_dict()))
# set accessed date to update time, modified to publication time
os.utime(html_path, (max(self.updated, self.updated), self.published))
def delete_html(self, ignore_deleted=False):
# Delete a HTML file for a JSON object
html_path = self.config.html_root / self.html_path
@ -142,9 +215,11 @@ class Article(ABC):
def write(self):
self.write_json()
self.write_html()
def delete(self):
self.delete_html(ignore_deleted=True)
self.delete_json()
def regenerate(self):
self.delete() # paths might change so we preemptively remove the old file
self.compute_fields() # recompute formatted datetime & paths from the current configuration
@ -154,6 +229,7 @@ class Article(ABC):
"""Returns true if the article is different from a previous version in a way that would require regeneration"""
return old_article.get_template_dict() != self.get_template_dict()
class FileArticle(Article):
def __init__(self, config: Config, json_path: Path) -> Article:
self.config = config

View file

@ -7,6 +7,7 @@ from feather.config import Config
from feather.client import GReaderSession, TTRSession, ClientSession
from feather.data import FileArticle
class FeatherApp:
config: Config
@ -15,6 +16,7 @@ class FeatherApp:
self._client_session = None
_client_session: ClientSession
def get_client_session(self) -> ClientSession:
"""Connect to the server and return a ClientSession object; return an existing ClientSession if we are already connected"""
if not self._client_session:
@ -25,7 +27,9 @@ class FeatherApp:
elif api == "ttrss":
self._client_session = TTRSession(config)
else:
raise ValueError(f"{api} server type is invalid; must be ttrss or googlereader")
raise ValueError(
f"{api} server type is invalid; must be ttrss or googlereader"
)
return self._client_session
def remove_empty_categories(self):
@ -33,10 +37,12 @@ class FeatherApp:
config = self.config
html_root = config.html_root
removed_directories = set()
for (dirpath, dirnames, filenames) in html_root.walk(top_down=False):
for dirpath, dirnames, filenames in html_root.walk(top_down=False):
if dirpath != html_root:
is_empty = len(filenames) == 0
if is_empty and len(dirnames) > 0: # some subdirectories may have been removed in an earlier iteration
if (
is_empty and len(dirnames) > 0
): # some subdirectories may have been removed in an earlier iteration
for subdirname in dirnames:
if dirpath / subdirname not in removed_directories:
is_empty = False
@ -51,7 +57,9 @@ class FeatherApp:
client_session = self.get_client_session()
if config.update_lock.exists():
print("The previous synchronization was aborted, not marking any article as read/unread in order to avoid collateral damage")
print(
"The previous synchronization was aborted, not marking any article as read/unread in order to avoid collateral damage"
)
return
marked_as_read, marked_as_unread = 0, 0
@ -69,9 +77,13 @@ class FeatherApp:
marked_as_unread += 1
for i in range(0, len(to_mark_as_read), config.articles_per_query):
client_session.set_read_flag(to_mark_as_read[i:i+config.articles_per_query], True)
client_session.set_read_flag(
to_mark_as_read[i : i + config.articles_per_query], True
)
for i in range(0, len(to_mark_as_unread), config.articles_per_query):
client_session.set_read_flag(to_mark_as_unread[i : i + config.articles_per_query], False)
client_session.set_read_flag(
to_mark_as_unread[i : i + config.articles_per_query], False
)
print(f"Marked {marked_as_read} articles as read, {marked_as_unread} unread")
@ -92,7 +104,12 @@ class FeatherApp:
remaining, continuation = True, 0
while remaining:
articles = client_session.get_articles_in_category(category, limit=config.articles_per_query, continuation=continuation, unread_only=config.only_sync_unread_articles)
articles = client_session.get_articles_in_category(
category,
limit=config.articles_per_query,
continuation=continuation,
unread_only=config.only_sync_unread_articles,
)
if len(articles) >= config.articles_per_query:
continuation += len(articles)
else:
@ -118,7 +135,9 @@ class FeatherApp:
FileArticle(config, article_path).delete()
removed_articles += 1
print(f"Synchronization successful ({new_articles} new articles, {updated_articles} updated, {removed_articles} removed)")
print(
f"Synchronization successful ({new_articles} new articles, {updated_articles} updated, {removed_articles} removed)"
)
config.update_lock.unlink()
def synchronize(self):
@ -144,20 +163,26 @@ class FeatherApp:
while True:
self.synchronize_local_changes()
await asyncio.sleep(self.config.daemon_sync_up_every)
async def daemon_sync_down_loop(self):
while True:
self.synchronize_remote_changes()
await asyncio.sleep(self.config.daemon_sync_down_every)
async def daemon(self):
"""Start the synchronization daemon"""
config = self.config
print(f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s")
print(
f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every}s"
)
async with asyncio.TaskGroup() as tg:
tup = tg.create_task(self.daemon_sync_up_loop())
tdown = tg.create_task(self.daemon_sync_down_loop())
def cancel_tasks():
tup.cancel()
tdown.cancel()
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks)
def regenerate_files(self):
@ -172,4 +197,3 @@ class FeatherApp:
for json_path in config.json_root.glob("*.json"):
FileArticle(config, json_path).delete()
self.remove_empty_categories()