Files
fuji-photo-processor/processor.py
Nick Roodenrijs 88db0c1b24 Fix SQLite threading bug, correct FTP upload path
- Add check_same_thread=False to SQLite connection (watchdog
  callbacks run in a separate thread)
- Camera upload dir is /data/photos/incoming (not /incoming)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 19:44:27 +01:00

223 lines
6.9 KiB
Python

"""
Fuji Photo Processor
Watches /incoming/ for new JPEGs, moves originals and creates resized copies.
Designed for Fuji X-H2 → FTP → Synology NAS → Immich pipeline.
"""
import logging
import os
import shutil
import sqlite3
import sys
import time
from datetime import datetime
from pathlib import Path
from PIL import Image
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
# ---------------------------------------------------------------------------
# Configuration from environment
# ---------------------------------------------------------------------------
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
JPEG_QUALITY = int(os.environ.get("JPEG_QUALITY", "85"))
MAX_WIDTH = int(os.environ.get("MAX_WIDTH", "1080"))
MAX_HEIGHT = int(os.environ.get("MAX_HEIGHT", "1920"))
INCOMING_DIR = Path("/incoming")
ORIGINALS_DIR = Path("/originals")
PROCESSED_DIR = Path("/processed")
DB_PATH = Path("/data/processed.db")
JPEG_EXTENSIONS = {".jpg", ".jpeg"}
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
stream=sys.stdout,
)
log = logging.getLogger("processor")
# ---------------------------------------------------------------------------
# Database
# ---------------------------------------------------------------------------
def init_db():
"""Initialize the SQLite tracking database."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.execute(
"""CREATE TABLE IF NOT EXISTS processed_files (
filename TEXT PRIMARY KEY,
processed_at TEXT NOT NULL,
original_path TEXT NOT NULL,
resized_path TEXT NOT NULL
)"""
)
conn.commit()
return conn
def is_already_processed(conn, filename):
"""Check if a file has already been processed."""
row = conn.execute(
"SELECT 1 FROM processed_files WHERE filename = ?", (filename,)
).fetchone()
return row is not None
def mark_processed(conn, filename, original_path, resized_path):
"""Record a file as processed."""
conn.execute(
"INSERT INTO processed_files (filename, processed_at, original_path, resized_path) "
"VALUES (?, ?, ?, ?)",
(filename, datetime.now().isoformat(), str(original_path), str(resized_path)),
)
conn.commit()
# ---------------------------------------------------------------------------
# Processing
# ---------------------------------------------------------------------------
def is_jpeg(path):
"""Check if a file path has a JPEG extension (case-insensitive)."""
return Path(path).suffix.lower() in JPEG_EXTENSIONS
def process_file(filepath, conn):
"""Move original and create a resized copy of a JPEG file."""
filepath = Path(filepath)
if not filepath.exists():
return
if not is_jpeg(filepath):
return
filename = filepath.name
if is_already_processed(conn, filename):
log.info("Skipping already processed: %s", filename)
return
try:
# Determine year/month from file modification time
mtime = filepath.stat().st_mtime
dt = datetime.fromtimestamp(mtime)
year_month = f"{dt.year}/{dt.month:02d}"
# Paths
original_dest = ORIGINALS_DIR / year_month / filename
resized_dest = PROCESSED_DIR / year_month / filename
# Create directories
original_dest.parent.mkdir(parents=True, exist_ok=True)
resized_dest.parent.mkdir(parents=True, exist_ok=True)
# Resize and save with EXIF preserved
with Image.open(filepath) as image:
exif_data = image.info.get("exif", b"")
image.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.LANCZOS)
save_kwargs = {"quality": JPEG_QUALITY}
if exif_data:
save_kwargs["exif"] = exif_data
image.save(str(resized_dest), "JPEG", **save_kwargs)
# Move original out of incoming
shutil.move(str(filepath), str(original_dest))
# Track in database
mark_processed(conn, filename, original_dest, resized_dest)
log.info("Processed: %s → originals/%s, processed/%s", filename, year_month, year_month)
except Exception:
log.exception("Error processing %s", filename)
# ---------------------------------------------------------------------------
# Watchdog handler
# ---------------------------------------------------------------------------
class PhotoHandler(FileSystemEventHandler):
"""Handles new JPEG files appearing in the incoming directory."""
def __init__(self, conn):
super().__init__()
self.conn = conn
def on_created(self, event):
if event.is_directory:
return
if is_jpeg(event.src_path):
log.info("New file detected: %s", event.src_path)
# Small delay to ensure file is fully written (FTP uploads)
time.sleep(2)
process_file(event.src_path, self.conn)
def on_moved(self, event):
"""Handle FTP clients that create temp files then rename."""
if event.is_directory:
return
if is_jpeg(event.dest_path):
log.info("Renamed file detected: %s", event.dest_path)
time.sleep(2)
process_file(event.dest_path, self.conn)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def scan_existing(conn):
"""Process any existing files in /incoming/ (catch-up after restart)."""
existing = list(INCOMING_DIR.glob("*"))
jpegs = [f for f in existing if f.is_file() and is_jpeg(f)]
if jpegs:
log.info("Found %d existing JPEG(s) in /incoming/, processing...", len(jpegs))
for filepath in jpegs:
process_file(filepath, conn)
else:
log.info("No existing files in /incoming/")
def main():
log.info("=== Fuji Photo Processor ===")
log.info("Config: poll=%ds, quality=%d, max_size=%dx%d",
POLL_INTERVAL, JPEG_QUALITY, MAX_WIDTH, MAX_HEIGHT)
log.info("Watching: %s", INCOMING_DIR)
conn = init_db()
log.info("Database initialized: %s", DB_PATH)
# Catch-up scan
scan_existing(conn)
# Start watching
handler = PhotoHandler(conn)
observer = PollingObserver(timeout=POLL_INTERVAL)
observer.schedule(handler, str(INCOMING_DIR), recursive=False)
observer.start()
log.info("Watching for new files (polling every %ds)...", POLL_INTERVAL)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
log.info("Shutting down...")
observer.stop()
observer.join()
conn.close()
if __name__ == "__main__":
main()