""" E-Ink Photo Frame Server Serves photos for the ESP32 photo frame and provides a web UI for management. Tracks frame status via heartbeat reports from the ESP32. Endpoints: GET / Web UI — gallery, upload, frame status, settings GET /photo Random JPEG processed for the display (called by ESP32) POST /heartbeat ESP32 reports what it displayed GET /api/status Frame status (for Home Assistant REST sensor) GET /api/photos List all photos as JSON POST /api/upload Upload new photos DELETE /api/photos/ Delete a photo GET /api/settings Get frame settings PUT /api/settings Update frame settings GET /api/photos//settings Get per-image settings PUT /api/photos//settings Update per-image settings GET /preview/ Dithered e-paper preview image GET /simulate/ Full simulator page with controls GET /health Health check """ import argparse import io import json import random import sys from datetime import datetime, timezone from pathlib import Path from flask import Flask, Response, jsonify, request, render_template_string from werkzeug.utils import secure_filename from PIL import Image import numpy as np from dither import dither_floyd_steinberg app = Flask(__name__) PHOTOS_DIR: Path = Path(".") PHYSICAL_WIDTH = 800 PHYSICAL_HEIGHT = 480 SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".bmp", ".tiff"} DATA_DIR = Path("/data") STATE_FILE = DATA_DIR / "frame_state.json" SETTINGS_FILE = DATA_DIR / "settings.json" IMAGE_SETTINGS_FILE = DATA_DIR / "image_settings.json" # --- Persistence helpers --- def _load_json(path: Path, default): try: if path.exists(): return json.loads(path.read_text()) except Exception: pass return default() if callable(default) else default def _save_json(path: Path, data): try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2)) except Exception as e: print(f"Warning: could not save {path}: {e}", file=sys.stderr) # Frame state (runtime, updated by heartbeats) frame_state = _load_json(STATE_FILE, lambda: { "last_update": None, "current_photo": None, "ip": None, "updates": 0 }) # Frame settings (user-configured) DEFAULT_SETTINGS = { "orientation": "landscape", # landscape | portrait_cw | portrait_ccw "default_mode": "zoom", # zoom | letterbox "letterbox_color": [255, 255, 255], # white — looks like e-paper background } frame_settings = _load_json(SETTINGS_FILE, lambda: dict(DEFAULT_SETTINGS)) # Per-image settings: { "filename.jpg": { "mode": "zoom", "pan_x": 0.5, "pan_y": 0.3 }, ... } image_settings: dict = _load_json(IMAGE_SETTINGS_FILE, dict) def get_image_settings(name: str) -> dict: """Get effective settings for an image (per-image overrides merged with defaults).""" defaults = {"mode": None, "pan_x": 0.5, "pan_y": 0.5} per_image = image_settings.get(name, {}) merged = {**defaults, **per_image} if merged["mode"] is None: merged["mode"] = frame_settings.get("default_mode", "zoom") return merged # --- Image processing --- def get_logical_dimensions() -> tuple[int, int]: """Get the logical display dimensions based on frame orientation.""" orientation = frame_settings.get("orientation", "landscape") if orientation == "landscape": return PHYSICAL_WIDTH, PHYSICAL_HEIGHT # 800x480 else: return PHYSICAL_HEIGHT, PHYSICAL_WIDTH # 480x800 def process_image(img: Image.Image, settings: dict) -> Image.Image: """ Resize and transform an image for the display. Takes frame orientation into account: - Determines logical dimensions (800x480 or 480x800) - Applies zoom+pan or letterbox based on settings - Rotates to match physical 800x480 buffer if portrait Returns an 800x480 image ready for dithering / JPEG encoding. """ logical_w, logical_h = get_logical_dimensions() mode = settings.get("mode", "zoom") pan_x = settings.get("pan_x", 0.5) pan_y = settings.get("pan_y", 0.5) if mode == "zoom": img = _zoom_and_pan(img, logical_w, logical_h, pan_x, pan_y) else: img = _letterbox(img, logical_w, logical_h) # Rotate to match physical buffer if portrait orientation = frame_settings.get("orientation", "landscape") if orientation == "portrait_cw": img = img.rotate(-90, expand=True) elif orientation == "portrait_ccw": img = img.rotate(90, expand=True) return img def _zoom_and_pan(img: Image.Image, tw: int, th: int, pan_x: float, pan_y: float) -> Image.Image: """Scale to cover target, then crop at the specified pan position.""" target_ratio = tw / th img_ratio = img.width / img.height if img_ratio > target_ratio: # Image is wider — fit height, crop width new_height = th new_width = int(img.width * (th / img.height)) else: # Image is taller — fit width, crop height new_width = tw new_height = int(img.height * (tw / img.width)) img = img.resize((new_width, new_height), Image.LANCZOS) # Pan-aware crop: pan_x/pan_y are 0.0–1.0, controlling crop position max_left = new_width - tw max_top = new_height - th left = int(max_left * pan_x) top = int(max_top * pan_y) return img.crop((left, top, left + tw, top + th)) def _letterbox(img: Image.Image, tw: int, th: int) -> Image.Image: """Scale to fit inside target, pad with letterbox color.""" lb_color = tuple(frame_settings.get("letterbox_color", [255, 255, 255])) target_ratio = tw / th img_ratio = img.width / img.height if img_ratio > target_ratio: # Image is wider — fit width new_width = tw new_height = int(img.height * (tw / img.width)) else: # Image is taller — fit height new_height = th new_width = int(img.width * (th / img.height)) img = img.resize((new_width, new_height), Image.LANCZOS) canvas = Image.new("RGB", (tw, th), lb_color) paste_x = (tw - new_width) // 2 paste_y = (th - new_height) // 2 canvas.paste(img, (paste_x, paste_y)) return canvas def get_photo_list() -> list[Path]: photos = [] for f in PHOTOS_DIR.rglob("*"): if f.suffix.lower() in SUPPORTED_EXTENSIONS and f.is_file(): photos.append(f) return sorted(photos, key=lambda p: p.name) def make_thumbnail(img: Image.Image, size: int = 300) -> bytes: img.thumbnail((size, size), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=75) return buf.getvalue() # --- ESP32 endpoints --- @app.route("/photo") def random_photo(): photos = get_photo_list() if not photos: return jsonify({"error": "No photos found"}), 404 chosen = random.choice(photos) try: img = Image.open(chosen).convert("RGB") settings = get_image_settings(chosen.name) img = process_image(img, settings) buf = io.BytesIO() img.save(buf, format="JPEG", quality=85) buf.seek(0) return Response(buf.getvalue(), mimetype="image/jpeg", headers={"X-Photo-Name": chosen.name}) except Exception as e: print(f"Error processing {chosen}: {e}", file=sys.stderr) return jsonify({"error": str(e)}), 500 @app.route("/heartbeat", methods=["POST"]) def heartbeat(): global frame_state data = request.get_json(silent=True) or {} frame_state["last_update"] = datetime.now(timezone.utc).isoformat() frame_state["current_photo"] = data.get("photo", None) frame_state["ip"] = request.remote_addr frame_state["free_heap"] = data.get("free_heap", None) frame_state["updates"] = frame_state.get("updates", 0) + 1 _save_json(STATE_FILE, frame_state) return jsonify({"ok": True}) # --- Settings API --- @app.route("/api/settings", methods=["GET"]) def api_get_settings(): return jsonify(frame_settings) @app.route("/api/settings", methods=["PUT"]) def api_put_settings(): global frame_settings data = request.get_json() allowed_keys = {"orientation", "default_mode", "letterbox_color"} for k, v in data.items(): if k in allowed_keys: frame_settings[k] = v _save_json(SETTINGS_FILE, frame_settings) return jsonify(frame_settings) @app.route("/api/photos//settings", methods=["GET"]) def api_get_image_settings(name: str): fname = secure_filename(name) return jsonify(get_image_settings(fname)) @app.route("/api/photos//settings", methods=["PUT"]) def api_put_image_settings(name: str): fname = secure_filename(name) data = request.get_json() allowed_keys = {"mode", "pan_x", "pan_y"} current = image_settings.get(fname, {}) for k, v in data.items(): if k in allowed_keys: current[k] = v image_settings[fname] = current _save_json(IMAGE_SETTINGS_FILE, image_settings) return jsonify(get_image_settings(fname)) @app.route("/api/photos//settings", methods=["DELETE"]) def api_delete_image_settings(name: str): fname = secure_filename(name) image_settings.pop(fname, None) _save_json(IMAGE_SETTINGS_FILE, image_settings) return jsonify(get_image_settings(fname)) # --- Other API endpoints --- @app.route("/api/status") def api_status(): photos = get_photo_list() return jsonify({ "state": "online" if frame_state.get("last_update") else "waiting", "last_update": frame_state.get("last_update"), "current_photo": frame_state.get("current_photo"), "frame_ip": frame_state.get("ip"), "total_photos": len(photos), "total_updates": frame_state.get("updates", 0), "orientation": frame_settings.get("orientation", "landscape"), }) @app.route("/api/photos") def api_photos(): photos = get_photo_list() result = [] for p in photos: entry = {"name": p.name, "size": p.stat().st_size} if p.name in image_settings: entry["settings"] = image_settings[p.name] result.append(entry) return jsonify(result) @app.route("/api/upload", methods=["POST"]) def api_upload(): uploaded = [] for f in request.files.getlist("photos"): if not f.filename: continue fname = secure_filename(f.filename) ext = Path(fname).suffix.lower() if ext not in SUPPORTED_EXTENSIONS: continue dest = PHOTOS_DIR / fname f.save(str(dest)) uploaded.append(fname) return jsonify({"uploaded": uploaded, "count": len(uploaded)}) @app.route("/api/photos/", methods=["DELETE"]) def api_delete(name: str): fname = secure_filename(name) path = PHOTOS_DIR / fname if not path.exists(): return jsonify({"error": "not found"}), 404 path.unlink() image_settings.pop(fname, None) _save_json(IMAGE_SETTINGS_FILE, image_settings) return jsonify({"deleted": fname}) @app.route("/thumb/") def thumbnail(name: str): fname = secure_filename(name) path = PHOTOS_DIR / fname if not path.exists(): return "", 404 try: size = request.args.get("size", 300, type=int) size = min(size, 1600) img = Image.open(path).convert("RGB") data = make_thumbnail(img, size) return Response(data, mimetype="image/jpeg") except Exception: return "", 500 # --- Simulator / Preview --- @app.route("/preview/") def preview_photo(name: str): fname = secure_filename(name) path = PHOTOS_DIR / fname if not path.exists(): return jsonify({"error": "not found"}), 404 # Allow query param overrides for interactive simulator settings = get_image_settings(fname) if "mode" in request.args: settings["mode"] = request.args["mode"] if "pan_x" in request.args: settings["pan_x"] = float(request.args["pan_x"]) if "pan_y" in request.args: settings["pan_y"] = float(request.args["pan_y"]) try: img = Image.open(path).convert("RGB") img = process_image(img, settings) _, preview = dither_floyd_steinberg(img) buf = io.BytesIO() preview.save(buf, format="PNG") buf.seek(0) return Response(buf.getvalue(), mimetype="image/png") except Exception as e: print(f"Preview error: {e}", file=sys.stderr) return jsonify({"error": str(e)}), 500 @app.route("/preview") def preview_random(): photos = get_photo_list() if not photos: return jsonify({"error": "No photos found"}), 404 chosen = random.choice(photos) return preview_photo(chosen.name) @app.route("/api/photos//geometry") def photo_geometry(name: str): """Return source image dimensions and the computed crop/fit geometry.""" fname = secure_filename(name) path = PHOTOS_DIR / fname if not path.exists(): return jsonify({"error": "not found"}), 404 img = Image.open(path) lw, lh = get_logical_dimensions() return jsonify({ "source_width": img.width, "source_height": img.height, "logical_width": lw, "logical_height": lh, "logical_aspect": lw / lh, }) @app.route("/simulate/") def simulate_page(name: str): fname = secure_filename(name) path = PHOTOS_DIR / fname if not path.exists(): return "", 404 settings = get_image_settings(fname) orientation = frame_settings.get("orientation", "landscape") lw, lh = get_logical_dimensions() return render_template_string(SIMULATE_UI, photo_name=fname, settings=settings, orientation=orientation, logical_w=lw, logical_h=lh) SIMULATE_UI = """ {{ photo_name }} — Frame Preview
← Gallery {{ photo_name }}
E-Paper Preview
""" # --- Web UI --- @app.route("/") def index(): photos = get_photo_list() orientation = frame_settings.get("orientation", "landscape") default_mode = frame_settings.get("default_mode", "zoom") return render_template_string(WEB_UI, photos=photos, state=frame_state, photo_count=len(photos), orientation=orientation, default_mode=default_mode) @app.route("/health") def health(): photos = get_photo_list() return jsonify({"status": "ok", "photo_count": len(photos)}) WEB_UI = """ Photo Frame

Photo Frame

Frame Status {{ 'Online' if state.get('last_update') else 'Waiting for first update' }}
{% if state.get('last_update') %}
Last Update {{ state.last_update[:19] }}
Showing {{ state.get('current_photo', 'Unknown') }}
{% endif %}
Photos {{ photo_count }}
Orientation
Default Mode

Drop photos here or click to upload

{% if photos %} {% else %}
No photos yet. Upload some!
{% endif %}
""" if __name__ == "__main__": parser = argparse.ArgumentParser(description="E-Ink Photo Frame Server") parser.add_argument("--port", type=int, default=8473) parser.add_argument("--photos-dir", type=str, default="./photos") args = parser.parse_args() PHOTOS_DIR = Path(args.photos_dir) DATA_DIR = Path(args.photos_dir) / ".data" STATE_FILE = DATA_DIR / "frame_state.json" SETTINGS_FILE = DATA_DIR / "settings.json" IMAGE_SETTINGS_FILE = DATA_DIR / "image_settings.json" if not PHOTOS_DIR.exists(): PHOTOS_DIR.mkdir(parents=True, exist_ok=True) # Reload with correct paths frame_state = _load_json(STATE_FILE, lambda: { "last_update": None, "current_photo": None, "ip": None, "updates": 0 }) frame_settings = _load_json(SETTINGS_FILE, lambda: dict(DEFAULT_SETTINGS)) image_settings = _load_json(IMAGE_SETTINGS_FILE, dict) print(f"Serving photos from: {PHOTOS_DIR.resolve()}") print(f"Found {len(get_photo_list())} photos") print(f"Orientation: {frame_settings['orientation']}, Default mode: {frame_settings['default_mode']}") print(f"Listening on port {args.port}") app.run(host="0.0.0.0", port=args.port)