1
0
Fork 0
mirror of https://codeberg.org/Reuh/feather.git synced 2025-10-27 18:19:32 +00:00

refactor: code comments & cleaning

This commit is contained in:
Étienne Fildadut 2025-10-11 17:07:34 +02:00
parent 4438c48631
commit 960e06252e
3 changed files with 72 additions and 50 deletions

View file

@ -148,7 +148,7 @@ class TTRSession(ClientSession):
self.ttrss.login() self.ttrss.login()
self.feeds = {} self.feeds = {}
def set_unread(self, article_ids: list[ArticleId], read: bool = True): def set_read_flag(self, article_ids: list[ArticleId], read: bool = True):
if read: if read:
self.ttrss.mark_read(article_ids) self.ttrss.mark_read(article_ids)
else: else:

View file

@ -12,27 +12,33 @@ from hashlib import sha256
from feather.config import Config from feather.config import Config
def escape_filename(config, filename): def sanitize_filename(
return filename.translate(config.filename_translation) config: Config, filename: str, insert_before_suffix: str = ""
) -> str:
"""Escape invalid caracters and truncate the filename as per the configuration.
This operates on a single filename, not a path.
(insert_before_suffix will be inserted between the stem and suffix, and is assumed to not need escaping)"""
filename = filename.translate(config.filename_translation)
def truncate_filename(config, filename):
max_filename_length = config.max_filename_length max_filename_length = config.max_filename_length
filename_utf8 = filename.encode("utf-8") filename_len = len(filename.encode("utf-8"))
if len(filename_utf8) <= max_filename_length: insert_before_suffix_len = len(insert_before_suffix.encode("utf-8"))
return filename if filename_len + insert_before_suffix_len <= max_filename_length:
path = Path(filename)
return f"{path.stem}{insert_before_suffix}{path.suffix}"
else: else:
suffix = Path(filename).suffix suffix = Path(filename).suffix
max_basename_length = max_filename_length - len(suffix.encode("utf-8")) max_stem_length = (
max_filename_length - insert_before_suffix_len - len(suffix.encode("utf-8"))
)
cutoff = len( cutoff = len(
filename.encode("utf-8")[:max_basename_length].decode( filename.encode("utf-8")[:max_stem_length].decode("utf-8", errors="ignore")
"utf-8", errors="ignore"
) )
) return filename[:cutoff] + "" + insert_before_suffix + suffix
return filename[:cutoff] + "" + suffix
def format_datetime(config, timestamp): def format_datetime(config: Config, timestamp: int) -> str:
"""Format a timestamp according to the configuraiton"""
return datetime.fromtimestamp(timestamp, config.timezone).strftime( return datetime.fromtimestamp(timestamp, config.timezone).strftime(
config.time_format config.time_format
) )
@ -71,12 +77,12 @@ type ArticleId = int | str
class Article(ABC): class Article(ABC):
config: Config config: Config
json_path: Path json_path: Path # JSON path
# fields serialized into the JSON file # # fields serialized into the JSON file #
# no default value # no default value
id: ArticleId # article id id: ArticleId # article unique id
category: Category # feed category category: Category # feed category
# no default value, computed by compute_fields # no default value, computed by compute_fields
published_formatted: str # article publication time (text) published_formatted: str # article publication time (text)
@ -99,26 +105,26 @@ class Article(ABC):
language: str = "" # article language language: str = "" # article language
image_url: str = "" # article main image image_url: str = "" # article main image
def get_html_path(self): def _get_html_path(self):
config = self.config config = self.config
category_directory = config.html_root
# Category directory path
category_directory = self.config.html_root
for category in self.category.parents: for category in self.category.parents:
category_directory /= escape_filename( category_directory /= sanitize_filename(
config, config.article_category_template.render(category.asdict()) config, config.article_category_template.render(category.asdict())
) )
category_directory /= escape_filename( category_directory /= sanitize_filename(
config, config.article_category_template.render(self.category.asdict()) config, config.article_category_template.render(self.category.asdict())
) )
html_name = truncate_filename( # Filename
html_name = sanitize_filename(
config, config,
escape_filename( config.article_filename_template.render(self._get_template_dict()),
config,
config.article_filename_template.render(self.get_template_dict()),
),
) )
return category_directory / html_name return html_path.relative_to(config.html_root)
def compute_fields(self): def compute_fields(self):
config = self.config config = self.config
@ -158,6 +164,7 @@ class Article(ABC):
return d return d
def write_json(self): def write_json(self):
"""Write the JSON file associated with this article. Error if it already exists."""
stored_fields = ( stored_fields = (
"id", "id",
"unread", "unread",
@ -189,45 +196,58 @@ class Article(ABC):
json.dump(article_json, f) json.dump(article_json, f)
def delete_json(self): def delete_json(self):
"""Delete the JSON file associated with this article."""
self.json_path.unlink() self.json_path.unlink()
def has_html(self):
"""Check if the HTML file associated with the article exists on disk."""
if self.html_path is None:
return False
else:
html_path = self.config.html_root / self.html_path
return html_path.exists()
def write_html(self): def write_html(self):
# Write HTML file for a JSON object # Write HTML file for a JSON object
config = self.config config = self.config
html_path = config.html_root / self.html_path html_path = config.html_root / self.html_path
if html_path.exists(): # TODO: does this actually matter if html_path.exists():
print( raise Exception(
f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong." f"Unexpectedly tried to overwrite article file for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong."
) )
else: else:
html_path.parent.mkdir(parents=True, exist_ok=True) html_path.parent.mkdir(parents=True, exist_ok=True)
with html_path.open("w") as f: with html_path.open("w") as f:
f.write(config.article_template.render(self.get_template_dict())) f.write(config.article_template.render(self._get_template_dict()))
# set accessed date to update time, modified to publication time # set accessed date to update time, modified to publication time
os.utime(html_path, (max(self.updated, self.updated), self.published)) os.utime(html_path, (max(self.updated, self.updated), self.published))
def delete_html(self, ignore_deleted=False): def delete_html(self, ignore_deleted=False):
"""Delete the HTML file associated with this article."""
# Delete a HTML file for a JSON object # Delete a HTML file for a JSON object
html_path = self.config.html_root / self.html_path html_path = self.config.html_root / self.html_path
if not ignore_deleted or html_path.exists(): if not ignore_deleted or html_path.exists():
html_path.unlink() html_path.unlink()
def write(self): def write(self):
self.write_json() """Write all the files associated with this article to disk."""
self.write_html() self.write_html()
self.write_json()
def delete(self): def delete(self):
"""Delete all the files associated with this article."""
self.delete_html(ignore_deleted=True) self.delete_html(ignore_deleted=True)
self.delete_json() self.delete_json()
def regenerate(self): def regenerate(self):
"""Delete and rewrite all the files associated with this article using to the latest configuration."""
self.delete() # paths might change so we preemptively remove the old file self.delete() # paths might change so we preemptively remove the old file
self.compute_fields() # recompute formatted datetime & paths from the current configuration self.compute_fields() # recompute formatted datetime & paths from the current configuration
self.write() # rewrite JSON & HTML self.write() # rewrite JSON & HTML
def was_updated(self, old_article: Article): def was_updated(self, old_article: Article):
"""Returns true if the article is different from a previous version in a way that would require regeneration""" """Returns true if the article is different from a previous version in a way that would require regeneration"""
return old_article.get_template_dict() != self.get_template_dict() return old_article._get_template_dict() != self._get_template_dict()
class FileArticle(Article): class FileArticle(Article):

View file

@ -4,7 +4,7 @@ import asyncio
import signal import signal
from feather.config import Config from feather.config import Config
from feather.client import GReaderSession, TTRSession, ClientSession from feather.client import GReaderSession, TTRSession, ClientSession, ArticleId
from feather.data import FileArticle from feather.data import FileArticle
@ -32,6 +32,12 @@ class FeatherApp:
) )
return self._client_session return self._client_session
def iter_articles(self):
"""Iterate over all the articles in local storage"""
config = self.config
for json_path in config.json_root.glob("*.json"):
yield FileArticle(config, json_path)
def remove_empty_categories(self): def remove_empty_categories(self):
"""Remove empty directories in the HTML directory""" """Remove empty directories in the HTML directory"""
config = self.config config = self.config
@ -65,10 +71,8 @@ class FeatherApp:
marked_as_read, marked_as_unread = 0, 0 marked_as_read, marked_as_unread = 0, 0
to_mark_as_read = [] to_mark_as_read = []
to_mark_as_unread = [] to_mark_as_unread = []
for json_path in config.json_root.glob("*.json"): for article in self.iter_articles():
article = FileArticle(config, json_path) if not article.has_html():
html_path = config.html_root / article.html_path
if not html_path.exists():
if article.unread: if article.unread:
to_mark_as_read.append(article.id) to_mark_as_read.append(article.id)
marked_as_read += 1 marked_as_read += 1
@ -96,7 +100,7 @@ class FeatherApp:
print("Synchronizing with server...") print("Synchronizing with server...")
new_articles, updated_articles = 0, 0 new_articles, updated_articles = 0, 0
grabbed_article_paths = set() grabbed_article_paths: set[ArticleId] = set()
categories = client_session.list_categories() categories = client_session.list_categories()
for category in categories: for category in categories:
@ -116,8 +120,8 @@ class FeatherApp:
remaining = False remaining = False
for article in articles: for article in articles:
grabbed_article_paths.add(article.id)
json_path = article.json_path json_path = article.json_path
grabbed_article_paths.add(json_path)
if not json_path.exists(): if not json_path.exists():
article.write() article.write()
new_articles += 1 new_articles += 1
@ -130,9 +134,9 @@ class FeatherApp:
# Remove articles that we didn't get from the server but are in the JSON directory # Remove articles that we didn't get from the server but are in the JSON directory
removed_articles = 0 removed_articles = 0
for article_path in config.json_root.glob("*.json"): for article in self.iter_articles():
if article_path not in grabbed_article_paths: if article.id not in grabbed_article_paths:
FileArticle(config, article_path).delete() article.delete()
removed_articles += 1 removed_articles += 1
print( print(
@ -187,13 +191,11 @@ class FeatherApp:
def regenerate_files(self): def regenerate_files(self):
"""Regenerate all local files using local data only""" """Regenerate all local files using local data only"""
config = self.config for article in self.iter_articles():
for json_path in config.json_root.glob("*.json"): article.regenerate()
FileArticle(config, json_path).regenerate()
def clear_data(self): def clear_data(self):
"""Delete all local data""" """Delete all local data"""
config = self.config for article in self.iter_articles():
for json_path in config.json_root.glob("*.json"): article.delete()
FileArticle(config, json_path).delete()
self.remove_empty_categories() self.remove_empty_categories()