1
0
Fork 0
mirror of https://codeberg.org/Reuh/feather.git synced 2025-12-14 07:19:08 +00:00
feather/src/feather/app.py

294 lines
12 KiB
Python
Executable file

"""Main feather application"""
import signal
import asyncio
from asyncio import Event
from typing import Iterable
from watchfiles import awatch
from pathlib import Path
from datetime import datetime
from feather.config import Config
from feather.client import GReaderSession, TTRSession, ClientSession, Article, ArticleId
from feather.data import FileArticle, InvalidArticleFileError
async def sleep_min_max(min_sleep: float, max_sleep: float, stop_sleep_event: Event):
"""Always sleep for at least min_sleep, and eventually up to max_sleep unless stop_sleep_event is never set during the sleep."""
stop_sleep_event.clear()
await asyncio.sleep(min_sleep)
await asyncio.wait(
(
asyncio.create_task(asyncio.sleep(max_sleep - min_sleep)),
asyncio.create_task(stop_sleep_event.wait()),
),
return_when=asyncio.FIRST_COMPLETED,
)
class FeatherApp:
def __init__(self, config: Config):
self.config: Config = config
self._client_session: ClientSession = None
def get_client_session(self) -> ClientSession:
"""Connect to the server and return a ClientSession object; return an existing ClientSession if we are already connected"""
if not self._client_session:
config = self.config
api = config.server_api
print(
f"Connecting to {config.server_user}@{config.server_url} ({api} API)..."
)
if api == "googlereader":
self._client_session = GReaderSession(config)
elif api == "ttrss":
self._client_session = TTRSession(config)
else:
raise ValueError(
f"{api} server type is invalid; must be ttrss or googlereader"
)
return self._client_session
def iter_articles(self) -> Iterable[Article]:
"""Iterate over all the articles in local storage"""
config = self.config
for json_path in config.json_root.glob("*.json"):
try:
yield FileArticle(config, json_path)
except InvalidArticleFileError:
print(
f"WARNING: Skipping corrupted article file {json_path}. Delete this file and its associated HTML file (if it exists) to resolve this warning."
)
def remove_empty_categories(self):
"""Remove empty directories in the HTML directory"""
config = self.config
html_root = config.html_root
removed_directories = set()
for dirpath, dirnames, filenames in html_root.walk(top_down=False):
if dirpath != html_root:
is_empty = len(filenames) == 0
if (
is_empty and len(dirnames) > 0
): # some subdirectories may have been removed in an earlier iteration
for subdirname in dirnames:
if dirpath / subdirname not in removed_directories:
is_empty = False
break
if is_empty:
dirpath.rmdir()
removed_directories.add(dirpath)
def toggle_read_flag_for_deleted(self):
"""Mark articles that are in the JSON directory but with missing HTML file as read/unread on the server"""
config = self.config
# gather articles to mark as read/unread
marked_as_read, marked_as_unread = 0, 0
to_mark_as_read = []
to_mark_as_unread = []
for article in self.iter_articles():
has_html = article.has_html()
if article.unread and not has_html:
to_mark_as_read.append(article)
marked_as_read += 1
elif not article.unread and (
(config.write_read_articles and not has_html)
or (not config.write_read_articles and has_html)
):
to_mark_as_unread.append(article)
marked_as_unread += 1
if len(to_mark_as_read) == len(to_mark_as_unread) == 0:
return # nothing to do
# change read state on server
client_session = self.get_client_session()
to_mark_as_read_id = [article.id for article in to_mark_as_read]
for i in range(0, len(to_mark_as_read_id), config.articles_per_query):
client_session.set_read_flag(
to_mark_as_read_id[i : i + config.articles_per_query], True
)
to_mark_as_unread_id = [article.id for article in to_mark_as_unread]
for i in range(0, len(to_mark_as_unread_id), config.articles_per_query):
client_session.set_read_flag(
to_mark_as_unread_id[i : i + config.articles_per_query], False
)
# regenerate local file with new read/unread state
for article in to_mark_as_read:
article.unread = False
article.regenerate()
for article in to_mark_as_unread:
article.unread = True
article.regenerate()
print(f"Marked {marked_as_read} articles as read, {marked_as_unread} unread")
def synchronize_with_server(self):
"""Synchronize articles from the server, generating and deleting JSON and HTML files accordingly"""
config = self.config
client_session = self.get_client_session()
print("Synchronizing from server...")
new_articles, updated_articles = 0, 0
grabbed_article_ids: set[ArticleId] = set()
categories = client_session.list_categories()
for category in categories:
print(f" Synchronizing category {category.title}")
remaining, continuation = True, 0
while remaining:
articles = client_session.get_articles_in_category(
category,
limit=config.articles_per_query,
continuation=continuation,
unread_only=config.only_sync_unread_articles,
)
if len(articles) >= config.articles_per_query:
continuation += len(articles)
else:
remaining = False
for article in articles:
grabbed_article_ids.add(article.id)
json_path = article.json_path
if not json_path.exists():
article.write()
new_articles += 1
else:
try:
old_article = FileArticle(config, json_path)
if article.was_updated(old_article):
old_article.delete()
article.write()
updated_articles += 1
except InvalidArticleFileError:
print(
f" WARNING: Skipping corrupted article file {json_path}. Delete this file and its associated HTML file (if it exists) to resolve this warning."
)
# Remove or mark-as-read articles that we didn't get from the server but are in the JSON directory
removed_articles = 0
article_cutoff_timestamp = (
datetime.now().timestamp() - config.keep_read_articles_for
)
for article in self.iter_articles():
if article.id not in grabbed_article_ids:
# we only sync unread: articles we didn't get from the server were read or purged
if config.only_sync_unread_articles:
if article.last_write < article_cutoff_timestamp:
article.delete()
removed_articles += 1
elif article.unread:
article.unread = False
article.regenerate()
updated_articles += 1
# we sync all articles: articles we didn't get from the server were purged
else:
article.delete()
removed_articles += 1
print(
f"Synchronization successful ({new_articles} new articles, {updated_articles} updated, {removed_articles} removed)"
)
def synchronize(self):
"""Do a full feather update"""
self.toggle_read_flag_for_deleted()
self.synchronize_with_server()
if self.config.hide_empty_categories:
self.remove_empty_categories()
def synchronize_local_changes(self):
"""Upload local changes (read articles) to the server"""
self.toggle_read_flag_for_deleted()
if self.config.hide_empty_categories:
self.remove_empty_categories()
def synchronize_remote_changes(self):
"""Download remote changes (new articles, articles read from another device) from the server"""
self.synchronize_with_server()
if self.config.hide_empty_categories:
self.remove_empty_categories()
async def daemon_sync_up_loop(self, stop_sleep_event: Event):
config = self.config
while True:
self.synchronize_local_changes()
await sleep_min_max(
config.daemon_sync_up_every_when_used,
config.daemon_sync_up_every,
stop_sleep_event,
)
async def daemon_sync_down_loop(self, stop_sleep_event: Event):
config = self.config
while True:
self.synchronize_remote_changes()
await sleep_min_max(
config.daemon_sync_down_every_when_used,
config.daemon_sync_down_every,
stop_sleep_event,
)
async def daemon_watch_loop(self, sync_up_event: Event, sync_down_event: Event):
def filter_html(change, path_str):
path = Path(path_str)
return path.is_dir() or path.suffix == ".html"
async for changes in awatch(
self.config.html_root,
watch_filter=filter_html,
step=250,
debounce=3600000, # so feather regenerate and other don't trigger a bunch of events while running
recursive=True,
):
sync_up_event.set()
sync_down_event.set()
async def daemon(self):
"""Start the synchronization daemon"""
config = self.config
if config.daemon_watch_files:
print(
f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every_when_used} to {config.daemon_sync_down_every} seconds and uploaded every {config.daemon_sync_up_every_when_used} to {config.daemon_sync_up_every} seconds"
)
else:
print(
f"Started in daemon mode; changes will be downloaded from the server every {config.daemon_sync_down_every}s and uploaded every {config.daemon_sync_up_every} seconds"
)
async with asyncio.TaskGroup() as tg:
stop_sleep_up_event, stop_sleep_down_event = Event(), Event()
tasks = [
tg.create_task(self.daemon_sync_up_loop(stop_sleep_up_event)),
tg.create_task(self.daemon_sync_down_loop(stop_sleep_down_event)),
]
if config.daemon_watch_files:
tasks.append(
tg.create_task(
self.daemon_watch_loop(
stop_sleep_up_event, stop_sleep_down_event
)
),
)
def cancel_tasks():
for task in tasks:
task.cancel()
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, cancel_tasks)
def regenerate_files(self):
"""Regenerate all local files using local data only"""
for article in self.iter_articles():
article.regenerate()
self.remove_empty_categories()
def clear_data(self):
"""Delete all local data"""
for article in self.iter_articles():
article.delete()
self.remove_empty_categories()