""" Photo source provider plugin framework. Provides the base class, registry, caching layer, and instance manager for photo source plugins. Concrete providers (local, SFTP, Google Photos, etc.) implement PhotoProvider and register via @register_provider. Usage: from providers import PhotoProvider, PhotoRef, register_provider, ProviderManager """ from __future__ import annotations import abc import enum import hashlib import io import json import random import shutil import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any from PIL import Image # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- @dataclass(frozen=True) class PhotoRef: """Lightweight reference to a photo — no pixel data, just metadata.""" provider_id: str photo_id: str display_name: str sort_key: str = "" @property def composite_key(self) -> str: """Unique key across all providers: '{provider_id}:{photo_id}'""" return f"{self.provider_id}:{self.photo_id}" class AuthType(enum.Enum): NONE = "none" CREDENTIALS = "credentials" OAUTH2 = "oauth2" CREDENTIALS_2FA = "credentials_2fa" @dataclass class ProviderHealth: status: str = "ok" # ok | degraded | error | auth_required message: str = "" last_successful: str | None = None # --------------------------------------------------------------------------- # Base class # --------------------------------------------------------------------------- class PhotoProvider(abc.ABC): """Base class for all photo source plugins.""" # Subclasses must set these as class attributes provider_type: str = "" display_name: str = "" auth_type: AuthType = AuthType.NONE config_schema: dict = {} def __init__(self, instance_id: str, config: dict): self.instance_id = instance_id self.config = config # -- Lifecycle -- def startup(self) -> None: """Called once when the provider instance is created. Use for connection setup, session restoration, etc.""" def shutdown(self) -> None: """Called on server shutdown or provider removal. Close connections, persist sessions, etc.""" def health_check(self) -> ProviderHealth: """Return current connectivity/auth status.""" return ProviderHealth() # -- Required abstract methods -- @abc.abstractmethod def list_photos(self) -> list[PhotoRef]: """Return all available photos from this source.""" ... @abc.abstractmethod def get_photo(self, photo_id: str) -> Image.Image: """Fetch and return the full-resolution image as an RGB PIL Image.""" ... # -- Optional capabilities -- def supports_upload(self) -> bool: return False def upload_photo(self, filename: str, data: bytes) -> PhotoRef: raise NotImplementedError def supports_delete(self) -> bool: return False def delete_photo(self, photo_id: str) -> None: raise NotImplementedError def get_photo_size(self, photo_id: str) -> int | None: """Return file size in bytes, or None if unknown.""" return None # -- Cache control (override for provider-specific TTLs) -- def list_cache_ttl(self) -> int: """Seconds to cache the photo list. Default 300s (5 min).""" return 300 def photo_cache_ttl(self) -> int | None: """Seconds to cache full images on disk. None = no caching. Local providers return None (direct read). Remote providers should return a positive value to avoid re-downloading.""" return None # -- Auth hooks (override for providers that need auth) -- def auth_start(self, callback_url: str) -> dict: """Begin OAuth flow. Return {"redirect_url": "..."}.""" raise NotImplementedError def auth_callback(self, params: dict) -> dict: """Handle OAuth callback. Return {"status": "ok"} or error.""" raise NotImplementedError def auth_submit_code(self, code: str) -> dict: """Submit 2FA/verification code. Return {"status": "ok"} or error.""" raise NotImplementedError def get_auth_state(self) -> dict: """Return current auth status for UI display.""" return {"authenticated": True} # --------------------------------------------------------------------------- # Registry # --------------------------------------------------------------------------- _PROVIDER_REGISTRY: dict[str, type[PhotoProvider]] = {} def register_provider(cls: type[PhotoProvider]) -> type[PhotoProvider]: """Class decorator: registers a PhotoProvider subclass by its provider_type.""" if not cls.provider_type: raise ValueError(f"{cls.__name__} must set provider_type") _PROVIDER_REGISTRY[cls.provider_type] = cls return cls def get_provider_class(provider_type: str) -> type[PhotoProvider]: if provider_type not in _PROVIDER_REGISTRY: raise ValueError(f"Unknown provider type: {provider_type!r}. " f"Available: {list(_PROVIDER_REGISTRY.keys())}") return _PROVIDER_REGISTRY[provider_type] def available_provider_types() -> list[dict]: """Return metadata about all registered provider types (for the web UI).""" return [ { "type": cls.provider_type, "display_name": cls.display_name, "auth_type": cls.auth_type.value, "config_schema": cls.config_schema, } for cls in _PROVIDER_REGISTRY.values() ] # --------------------------------------------------------------------------- # Caching layer # --------------------------------------------------------------------------- def _make_thumbnail(img: Image.Image, size: int = 300) -> bytes: img = img.copy() img.thumbnail((size, size), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=75) return buf.getvalue() def _safe_filename(s: str) -> str: """Convert an arbitrary string to a safe filename for cache keys.""" return hashlib.sha256(s.encode()).hexdigest()[:24] class ProviderCache: """Caching layer for photo lists, thumbnails, and full images.""" def __init__(self, cache_dir: Path): self._cache_dir = cache_dir self._thumb_dir = cache_dir / "thumbs" self._photo_dir = cache_dir / "photos" # In-memory: instance_id -> (expiry_timestamp, photos) self._list_cache: dict[str, tuple[float, list[PhotoRef]]] = {} def get_photo_list(self, provider: PhotoProvider) -> list[PhotoRef]: iid = provider.instance_id now = time.time() cached = self._list_cache.get(iid) if cached and now < cached[0]: return cached[1] photos = provider.list_photos() ttl = provider.list_cache_ttl() self._list_cache[iid] = (now + ttl, photos) return photos def invalidate_list(self, instance_id: str) -> None: self._list_cache.pop(instance_id, None) def get_photo(self, provider: PhotoProvider, photo_id: str) -> Image.Image: ttl = provider.photo_cache_ttl() if ttl is not None: cached = self._read_photo_cache(provider.instance_id, photo_id, ttl) if cached is not None: return cached img = provider.get_photo(photo_id) if ttl is not None: self._write_photo_cache(provider.instance_id, photo_id, img) return img def get_thumbnail(self, provider: PhotoProvider, photo_id: str, size: int = 300) -> bytes: safe_id = _safe_filename(f"{provider.instance_id}:{photo_id}") thumb_path = self._thumb_dir / f"{safe_id}_{size}.jpg" if thumb_path.exists(): return thumb_path.read_bytes() img = self.get_photo(provider, photo_id) data = _make_thumbnail(img, size) thumb_path.parent.mkdir(parents=True, exist_ok=True) thumb_path.write_bytes(data) return data def clear(self, instance_id: str) -> None: """Clear all caches for a provider instance.""" self.invalidate_list(instance_id) prefix = _safe_filename(f"{instance_id}:")[:12] for d in (self._thumb_dir, self._photo_dir): if d.exists(): for f in d.iterdir(): if f.name.startswith(prefix): f.unlink(missing_ok=True) def _read_photo_cache(self, instance_id: str, photo_id: str, ttl: int) -> Image.Image | None: safe_id = _safe_filename(f"{instance_id}:{photo_id}") path = self._photo_dir / f"{safe_id}.jpg" if not path.exists(): return None age = time.time() - path.stat().st_mtime if age > ttl: path.unlink(missing_ok=True) return None return Image.open(path).convert("RGB") def _write_photo_cache(self, instance_id: str, photo_id: str, img: Image.Image) -> None: safe_id = _safe_filename(f"{instance_id}:{photo_id}") self._photo_dir.mkdir(parents=True, exist_ok=True) path = self._photo_dir / f"{safe_id}.jpg" img.save(str(path), format="JPEG", quality=90) # --------------------------------------------------------------------------- # Provider instance manager # --------------------------------------------------------------------------- def _load_json(path: Path, default_factory): try: if path.exists(): return json.loads(path.read_text()) except Exception: pass return default_factory() def _save_json(path: Path, data): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2)) class ProviderManager: """Manages all provider instances, their configs, and the cache layer.""" PROVIDERS_FILE = "providers.json" def __init__(self, data_dir: Path, default_photos_dir: str = "/photos"): self._data_dir = data_dir self._config_file = data_dir / self.PROVIDERS_FILE self._default_photos_dir = default_photos_dir self._instances: dict[str, PhotoProvider] = {} self._configs: dict[str, dict] = {} self.cache = ProviderCache(data_dir / "cache") def load(self) -> None: """Load provider configs from disk and instantiate all enabled providers.""" raw = _load_json(self._config_file, dict) if not raw: # First run or empty config — auto-create local-default if the # local provider type is registered if "local" in _PROVIDER_REGISTRY: raw = { "local-default": { "type": "local", "enabled": True, "config": { "path": self._default_photos_dir, "recursive": True, }, } } _save_json(self._config_file, raw) for instance_id, cfg in raw.items(): if not cfg.get("enabled", True): self._configs[instance_id] = cfg continue provider_type = cfg.get("type", "") if provider_type not in _PROVIDER_REGISTRY: print(f"Warning: unknown provider type {provider_type!r} " f"for instance {instance_id!r}, skipping") self._configs[instance_id] = cfg continue try: self._create_instance(instance_id, cfg) except Exception as e: print(f"Warning: failed to start provider {instance_id!r}: {e}") self._configs[instance_id] = cfg def _create_instance(self, instance_id: str, cfg: dict) -> PhotoProvider: cls = get_provider_class(cfg["type"]) provider = cls(instance_id=instance_id, config=cfg.get("config", {})) provider.startup() self._instances[instance_id] = provider self._configs[instance_id] = cfg return provider def _persist(self) -> None: _save_json(self._config_file, self._configs) def add_provider(self, instance_id: str, provider_type: str, config: dict, enabled: bool = True) -> PhotoProvider: if ":" in instance_id: raise ValueError("Provider instance ID must not contain ':'") if instance_id in self._configs: raise ValueError(f"Provider {instance_id!r} already exists") cfg = {"type": provider_type, "enabled": enabled, "config": config} provider = self._create_instance(instance_id, cfg) self._persist() return provider def remove_provider(self, instance_id: str) -> None: provider = self._instances.pop(instance_id, None) if provider: provider.shutdown() self._configs.pop(instance_id, None) self.cache.clear(instance_id) self._persist() def update_provider_config(self, instance_id: str, config: dict) -> None: if instance_id not in self._configs: raise ValueError(f"Provider {instance_id!r} not found") # Shutdown old instance old = self._instances.pop(instance_id, None) if old: old.shutdown() # Update config and restart self._configs[instance_id]["config"] = config cfg = self._configs[instance_id] if cfg.get("enabled", True) and cfg["type"] in _PROVIDER_REGISTRY: self._create_instance(instance_id, cfg) self.cache.invalidate_list(instance_id) self._persist() def set_enabled(self, instance_id: str, enabled: bool) -> None: if instance_id not in self._configs: raise ValueError(f"Provider {instance_id!r} not found") self._configs[instance_id]["enabled"] = enabled if not enabled: provider = self._instances.pop(instance_id, None) if provider: provider.shutdown() elif instance_id not in self._instances: cfg = self._configs[instance_id] if cfg["type"] in _PROVIDER_REGISTRY: self._create_instance(instance_id, cfg) self._persist() def get_instance(self, instance_id: str) -> PhotoProvider | None: return self._instances.get(instance_id) def all_instances(self) -> dict[str, PhotoProvider]: return dict(self._instances) def all_configs(self) -> dict[str, dict]: return dict(self._configs) def shutdown_all(self) -> None: for provider in self._instances.values(): try: provider.shutdown() except Exception: pass self._instances.clear() # -- Aggregated photo operations -- def get_all_photos(self) -> list[PhotoRef]: all_photos: list[PhotoRef] = [] for provider in self._instances.values(): try: photos = self.cache.get_photo_list(provider) all_photos.extend(photos) except Exception as e: print(f"Warning: {provider.instance_id} list_photos failed: {e}") return all_photos def get_photo_image(self, ref: PhotoRef) -> Image.Image: provider = self._instances.get(ref.provider_id) if not provider: raise ValueError(f"Provider {ref.provider_id!r} not found or not enabled") return self.cache.get_photo(provider, ref.photo_id) def get_photo_thumbnail(self, ref: PhotoRef, size: int = 300) -> bytes: provider = self._instances.get(ref.provider_id) if not provider: raise ValueError(f"Provider {ref.provider_id!r} not found or not enabled") return self.cache.get_thumbnail(provider, ref.photo_id, size) def pick_random_photo(self) -> PhotoRef | None: all_photos = self.get_all_photos() if not all_photos: return None return random.choice(all_photos) def find_photo_ref(self, composite_key: str) -> PhotoRef | None: """Look up a PhotoRef by its composite key.""" if ":" not in composite_key: return None provider_id, photo_id = composite_key.split(":", 1) provider = self._instances.get(provider_id) if not provider: return None for ref in self.cache.get_photo_list(provider): if ref.photo_id == photo_id: return ref return None def save_provider_auth_state(self, instance_id: str, auth_data: dict) -> None: """Persist auth tokens/sessions into the provider's config. Called by providers after OAuth/2FA completion.""" if instance_id not in self._configs: return self._configs[instance_id].setdefault("config", {}).update(auth_data) self._persist() # --------------------------------------------------------------------------- # Migration helper # --------------------------------------------------------------------------- def migrate_image_settings(data_dir: Path, default_provider_id: str = "local-default"): """Rewrite bare-filename keys in image_settings.json to composite keys. Called once on first startup with the provider framework.""" settings_file = data_dir / "image_settings.json" if not settings_file.exists(): return raw = json.loads(settings_file.read_text()) if not raw: return # Check if already migrated (keys contain ':') if any(":" in k for k in raw): return # Backup backup = data_dir / "image_settings.json.bak" shutil.copy2(settings_file, backup) # Rewrite keys migrated = {f"{default_provider_id}:{k}": v for k, v in raw.items()} _save_json(settings_file, migrated) print(f"Migrated {len(migrated)} image settings to composite keys " f"(backup at {backup})")