mirror of
				https://codeberg.org/Reuh/feather.git
				synced 2025-10-27 18:19:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			236 lines
		
	
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import re
 | |
| import json
 | |
| import google_reader
 | |
| import tomllib
 | |
| import sys
 | |
| from datetime import datetime
 | |
| from zoneinfo import ZoneInfo
 | |
| from pathlib import Path
 | |
| from hashlib import sha256
 | |
| from jinja2 import Template
 | |
| 
 | |
| #%% Configuration
 | |
| 
 | |
| class Config:
 | |
|     def __init__(self):
 | |
|         with open("config.default.toml", "rb") as f:
 | |
|             default_config = tomllib.load(f)
 | |
| 
 | |
|         config_path = os.environ.get("CONFIG_PATH") or "config.toml"
 | |
|         with open(config_path, "rb") as f:
 | |
|             config = tomllib.load(f)
 | |
|         
 | |
|         def get_config(category, field, can_default=True):
 | |
|             env_name = f"{category.upper()}_{field.upper()}"
 | |
|             c = config.get(category, {})
 | |
|             if env_name in os.environ:
 | |
|                 return os.environ[env_name]
 | |
|             elif field in c:
 | |
|                 return c[field]
 | |
|             elif can_default:
 | |
|                 return default_config[category][field]
 | |
|             else:
 | |
|                 print(f"Error while loading configuration: {category}.{field} not found in {config_path} nor in environment variable {env_name}", file=sys.stderr)
 | |
|                 exit(1)
 | |
| 
 | |
|         # Get config fields
 | |
|         self.html_root: Path = Path(get_config("directories", "reader"))
 | |
|         self.json_root: Path = Path(get_config("directories", "data"))
 | |
|         self.server_url: str = str(get_config("server", "url", False))
 | |
|         self.server_user: str = str(get_config("server", "user", False))
 | |
|         self.server_password: str = str(get_config("server", "password", False))
 | |
|         self.items_per_query: int = int(get_config("server", "items_per_request"))
 | |
|         self.timezone: ZoneInfo = ZoneInfo(get_config("time", "timezone"))
 | |
|         self.time_format: str = str(get_config("time", "format"))
 | |
|         self.item_template: Template = Template(get_config("html", "template"), autoescape=True)
 | |
|         self.item_filename_template: Template = Template(get_config("html", "filename_template"), autoescape=False)
 | |
|         self.max_filename_length: int = int(get_config("html", "max_filename_length"))
 | |
| 
 | |
|         # Computed config fields
 | |
|         self.update_lock = self.json_root / "update.lock"
 | |
| 
 | |
|         # Create missing directories
 | |
|         self.html_root.mkdir(exist_ok=True)
 | |
|         self.json_root.mkdir(exist_ok=True)
 | |
| 
 | |
| #%% Interaction with server
 | |
| 
 | |
| label_name = re.compile("user/.*/label/(.*)")
 | |
| class ClientSession:
 | |
|     client: google_reader.Client
 | |
|     auth_token: str
 | |
|     csrf_token: str
 | |
| 
 | |
|     def __init__(self, config: Config):
 | |
|         self.client = google_reader.Client(config.server_url)
 | |
|         self.auth_token = self.client.login(config.server_user, config.server_password)
 | |
|         self.csrf_token = self.client.get_token(self.auth_token)
 | |
|     
 | |
|     def mark_as_read(self, item_ids):
 | |
|         self.client.edit_tags(self.auth_token, self.csrf_token, item_ids=item_ids, add_tags=[google_reader.STREAM_READ])
 | |
|     
 | |
|     def list_folders(self):
 | |
|         folders = [tag for tag in self.client.list_tags(self.auth_token) if tag.type == "folder"]
 | |
|         l = []
 | |
|         for folder in folders:
 | |
|             folder_name = folder.label or label_name.search(folder.id).group(1)
 | |
|             folder_id = folder.id
 | |
|             l.append((folder_name, folder_id))
 | |
|         return l
 | |
|     
 | |
|     def get_stream_items_ids(self, *args, **kwargs):
 | |
|         return self.client.get_stream_items_ids(self.auth_token, *args, **kwargs)
 | |
|     
 | |
|     def get_stream_items_contents(self, *args, **kwargs):
 | |
|         return self.client.get_stream_items_contents(self.auth_token, self.csrf_token, *args, **kwargs)
 | |
| 
 | |
| #%% Regular feather operations
 | |
| 
 | |
| def mark_deleted_as_read(config, client_session):
 | |
|     # Mark items that are in the JSON directory but with missing HTML file as read on the server
 | |
|     if config.update_lock.exists():
 | |
|         print("The previous synchronization was aborted, not marking any item as read in order to avoid collateral damage")
 | |
|         return
 | |
| 
 | |
|     marked_as_read = 0
 | |
|     to_mark_as_read = []
 | |
|     for stored_item in config.json_root.glob("*.json"):
 | |
|         item_json = json.load(stored_item.open("r"))
 | |
|         html_path = config.html_root / item_json["html_path"]
 | |
|         if not html_path.exists():
 | |
|             to_mark_as_read.append(item_json["id"])
 | |
|             # delete JSON file
 | |
|             stored_item.unlink()
 | |
|             marked_as_read += 1
 | |
| 
 | |
|     for i in range(0, len(to_mark_as_read), config.items_per_query):
 | |
|         client_session.mark_as_read(to_mark_as_read[i:i+500])
 | |
| 
 | |
|     print(f"Marked {marked_as_read} items as read")
 | |
| 
 | |
| def escape_filename(filename):
 | |
|     return filename.replace("/", "-")
 | |
| 
 | |
| def truncate_filename(config, filename):
 | |
|     max_filename_length = config.max_filename_length
 | |
|     suffix = Path(filename).suffix
 | |
|     max_basename_length = max_filename_length - len(suffix)
 | |
|     return filename[:max_basename_length] + '…' + suffix if len(filename) > max_filename_length else filename
 | |
| 
 | |
| def get_html_path(config, item_json):
 | |
|     folder_directory = config.html_root / escape_filename(item_json["folder"])
 | |
|     folder_directory.mkdir(exist_ok=True)
 | |
| 
 | |
|     html_name = truncate_filename(config, escape_filename(config.item_filename_template.render(item_json)))
 | |
| 
 | |
|     html_path = folder_directory / html_name
 | |
|     return html_path
 | |
| 
 | |
| def format_datetime(config, timestamp):
 | |
|     return datetime.fromtimestamp(timestamp, config.timezone).strftime(config.time_format)
 | |
| 
 | |
| def synchronize_with_server(config, client_session):
 | |
|     # Synchronize items from the server, generating and deleting JSON and HTML files accordingly
 | |
|     config.update_lock.touch()
 | |
|     print("Synchronizing with server...")
 | |
| 
 | |
|     new_items = 0
 | |
|     grabbed_item_paths = []
 | |
| 
 | |
|     folders = client_session.list_folders()
 | |
|     for (folder_name, folder_id) in folders:
 | |
|         print(f"  Updating folder {folder_name}")
 | |
| 
 | |
|         def process(item_ids):
 | |
|             nonlocal new_items, grabbed_item_paths
 | |
|             if len(item_ids) > 0:
 | |
|                 item_contents = client_session.get_stream_items_contents(item_ids=item_ids)
 | |
|                 for item_content in item_contents.items:
 | |
|                     item_json = {
 | |
|                         "id": item_content.id,
 | |
|                         "folder": folder_name,
 | |
|                         "title": item_content.title,
 | |
|                         "published": item_content.published,
 | |
|                         "published_formatted": format_datetime(config, item_content.published),
 | |
|                         "updated": item_content.updated,
 | |
|                         "updated_formatted": format_datetime(config, item_content.updated),
 | |
|                         "author": item_content.author,
 | |
|                         "summary": item_content.summary.content,
 | |
|                         "content": item_content.content.content,
 | |
|                         "origin_title": item_content.origin.title,
 | |
|                         "origin_url": item_content.origin.html_url,
 | |
|                         "canonical_url": item_content.canonical[0].href,
 | |
|                     }
 | |
|                     item_json["html_path"] = str(get_html_path(config, item_json).relative_to(config.html_root))
 | |
| 
 | |
|                     json_path = config.json_root / f"{ sha256(item_json["id"].encode("utf-8")).hexdigest() }.json"
 | |
|                     grabbed_item_paths.append(json_path)
 | |
|                     if not json_path.exists():
 | |
|                         # write JSON
 | |
|                         with json_path.open("w") as f:
 | |
|                             json.dump(item_json, f)
 | |
|                         # write HTML
 | |
|                         generate_html_for_item(config, item_json)
 | |
|                         new_items += 1
 | |
| 
 | |
|         continuation = None
 | |
|         while continuation != '':
 | |
|             items = client_session.get_stream_items_ids(stream_id=folder_id, exclude_target="user/-/state/com.google/read", limit=config.items_per_query, continuation=continuation)
 | |
|             item_ids = [item.id for item in items.item_refs]
 | |
|             process(item_ids)
 | |
|             continuation = items.continuation
 | |
|     
 | |
|     # Remove items that we didn't get from the server but are in the JSON directory
 | |
|     removed_items = 0
 | |
|     for item_path in config.json_root.glob("*.json"):
 | |
|         if not item_path in grabbed_item_paths:
 | |
|             # remove HTML
 | |
|             item_json = json.load(item_path.open("r"))
 | |
|             remove_html_for_item(config, item_json, ignore_deleted=True) # ignore if file was deleted by user during sync
 | |
|             # remove JSON
 | |
|             item_path.unlink()
 | |
|             removed_items += 1
 | |
|     
 | |
|     print(f"Synchronization successful ({new_items} new items, {removed_items} removed)")
 | |
|     config.update_lock.unlink()
 | |
| 
 | |
| def generate_html_for_item(config, item_json):
 | |
|     # Write HTML file for a JSON object
 | |
|     html_path = config.html_root / item_json["html_path"]
 | |
|     if html_path.exists():
 | |
|         print(f"WARNING: a file already exist for {html_path}. Either the feed has duplicate entries, or something has gone terribly wrong.")
 | |
|     else:
 | |
|         with html_path.open("w") as f:
 | |
|             f.write(config.item_template.render(item_json))
 | |
| 
 | |
| def remove_html_for_item(config, item_json, ignore_deleted=False):
 | |
|     # Delete a HTML file for a JSON object
 | |
|     html_path = config.html_root / item_json["html_path"]
 | |
|     if not ignore_deleted or html_path.exists():
 | |
|         html_path.unlink()
 | |
| 
 | |
| def remove_empty_html_directories(config):
 | |
|     # Remove empty directories in the HTML directory
 | |
|     html_root = config.html_root
 | |
|     for (dirpath, dirnames, filenames) in html_root.walk(top_down=False):
 | |
|         if dirpath != html_root:
 | |
|             if len(dirnames) == 0 and len(filenames) == 0:
 | |
|                 dirpath.rmdir()
 | |
| 
 | |
| def process(config, client_session):
 | |
|     # Do a full feather update
 | |
|     mark_deleted_as_read(config, client_session)
 | |
|     synchronize_with_server(config, client_session)
 | |
|     remove_empty_html_directories(config)
 | |
| 
 | |
| #%% Run feather
 | |
| 
 | |
| def main():
 | |
|     config = Config()
 | |
|     client_session = ClientSession(config)
 | |
|     process(config, client_session)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | |
| 
 |