""" Fuji Photo Processor Watches /incoming/ for new JPEGs, moves originals and creates resized copies. Designed for Fuji X-H2 → FTP → Synology NAS → Immich pipeline. """ import logging import os import shutil import sqlite3 import sys import time from datetime import datetime from pathlib import Path from PIL import Image from watchdog.events import FileSystemEventHandler from watchdog.observers.polling import PollingObserver # --------------------------------------------------------------------------- # Configuration from environment # --------------------------------------------------------------------------- POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) JPEG_QUALITY = int(os.environ.get("JPEG_QUALITY", "85")) MAX_WIDTH = int(os.environ.get("MAX_WIDTH", "1080")) MAX_HEIGHT = int(os.environ.get("MAX_HEIGHT", "1920")) INCOMING_DIR = Path("/incoming") ORIGINALS_DIR = Path("/originals") PROCESSED_DIR = Path("/processed") DB_PATH = Path("/data/processed.db") JPEG_EXTENSIONS = {".jpg", ".jpeg"} # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", stream=sys.stdout, ) log = logging.getLogger("processor") # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- def init_db(): """Initialize the SQLite tracking database.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH)) conn.execute( """CREATE TABLE IF NOT EXISTS processed_files ( filename TEXT PRIMARY KEY, processed_at TEXT NOT NULL, original_path TEXT NOT NULL, resized_path TEXT NOT NULL )""" ) conn.commit() return conn def is_already_processed(conn, filename): """Check if a file has already been processed.""" row = conn.execute( "SELECT 1 FROM processed_files WHERE filename = ?", (filename,) ).fetchone() return row is not None def mark_processed(conn, filename, original_path, resized_path): """Record a file as processed.""" conn.execute( "INSERT INTO processed_files (filename, processed_at, original_path, resized_path) " "VALUES (?, ?, ?, ?)", (filename, datetime.now().isoformat(), str(original_path), str(resized_path)), ) conn.commit() # --------------------------------------------------------------------------- # Processing # --------------------------------------------------------------------------- def is_jpeg(path): """Check if a file path has a JPEG extension (case-insensitive).""" return Path(path).suffix.lower() in JPEG_EXTENSIONS def process_file(filepath, conn): """Move original and create a resized copy of a JPEG file.""" filepath = Path(filepath) if not filepath.exists(): return if not is_jpeg(filepath): return filename = filepath.name if is_already_processed(conn, filename): log.info("Skipping already processed: %s", filename) return try: # Determine year/month from file modification time mtime = filepath.stat().st_mtime dt = datetime.fromtimestamp(mtime) year_month = f"{dt.year}/{dt.month:02d}" # Paths original_dest = ORIGINALS_DIR / year_month / filename resized_dest = PROCESSED_DIR / year_month / filename # Create directories original_dest.parent.mkdir(parents=True, exist_ok=True) resized_dest.parent.mkdir(parents=True, exist_ok=True) # Resize and save with EXIF preserved with Image.open(filepath) as image: exif_data = image.info.get("exif", b"") image.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.LANCZOS) save_kwargs = {"quality": JPEG_QUALITY} if exif_data: save_kwargs["exif"] = exif_data image.save(str(resized_dest), "JPEG", **save_kwargs) # Move original out of incoming shutil.move(str(filepath), str(original_dest)) # Track in database mark_processed(conn, filename, original_dest, resized_dest) log.info("Processed: %s → originals/%s, processed/%s", filename, year_month, year_month) except Exception: log.exception("Error processing %s", filename) # --------------------------------------------------------------------------- # Watchdog handler # --------------------------------------------------------------------------- class PhotoHandler(FileSystemEventHandler): """Handles new JPEG files appearing in the incoming directory.""" def __init__(self, conn): super().__init__() self.conn = conn def on_created(self, event): if event.is_directory: return if is_jpeg(event.src_path): log.info("New file detected: %s", event.src_path) # Small delay to ensure file is fully written (FTP uploads) time.sleep(2) process_file(event.src_path, self.conn) def on_moved(self, event): """Handle FTP clients that create temp files then rename.""" if event.is_directory: return if is_jpeg(event.dest_path): log.info("Renamed file detected: %s", event.dest_path) time.sleep(2) process_file(event.dest_path, self.conn) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def scan_existing(conn): """Process any existing files in /incoming/ (catch-up after restart).""" existing = list(INCOMING_DIR.glob("*")) jpegs = [f for f in existing if f.is_file() and is_jpeg(f)] if jpegs: log.info("Found %d existing JPEG(s) in /incoming/, processing...", len(jpegs)) for filepath in jpegs: process_file(filepath, conn) else: log.info("No existing files in /incoming/") def main(): log.info("=== Fuji Photo Processor ===") log.info("Config: poll=%ds, quality=%d, max_size=%dx%d", POLL_INTERVAL, JPEG_QUALITY, MAX_WIDTH, MAX_HEIGHT) log.info("Watching: %s", INCOMING_DIR) conn = init_db() log.info("Database initialized: %s", DB_PATH) # Catch-up scan scan_existing(conn) # Start watching handler = PhotoHandler(conn) observer = PollingObserver(timeout=POLL_INTERVAL) observer.schedule(handler, str(INCOMING_DIR), recursive=False) observer.start() log.info("Watching for new files (polling every %ds)...", POLL_INTERVAL) try: while True: time.sleep(1) except KeyboardInterrupt: log.info("Shutting down...") observer.stop() observer.join() conn.close() if __name__ == "__main__": main()