Initial commit: Fuji photo processor pipeline
Automatic photo processing: Fuji X-H2 → FTP → Synology NAS → resize → Immich - PollingObserver watches /incoming/ for new JPEGs - Moves originals to /originals/YYYY/MM/ - Creates resized copies (1080x1920 @ 85%) with EXIF preserved - SQLite tracking to prevent duplicate processing - Deploy script for Synology NAS (docker run) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
222
processor.py
Normal file
222
processor.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""
|
||||
Fuji Photo Processor
|
||||
Watches /incoming/ for new JPEGs, moves originals and creates resized copies.
|
||||
Designed for Fuji X-H2 → FTP → Synology NAS → Immich pipeline.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers.polling import PollingObserver
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration from environment
|
||||
# ---------------------------------------------------------------------------
|
||||
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
|
||||
JPEG_QUALITY = int(os.environ.get("JPEG_QUALITY", "85"))
|
||||
MAX_WIDTH = int(os.environ.get("MAX_WIDTH", "1080"))
|
||||
MAX_HEIGHT = int(os.environ.get("MAX_HEIGHT", "1920"))
|
||||
|
||||
INCOMING_DIR = Path("/incoming")
|
||||
ORIGINALS_DIR = Path("/originals")
|
||||
PROCESSED_DIR = Path("/processed")
|
||||
DB_PATH = Path("/data/processed.db")
|
||||
|
||||
JPEG_EXTENSIONS = {".jpg", ".jpeg"}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Logging
|
||||
# ---------------------------------------------------------------------------
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
stream=sys.stdout,
|
||||
)
|
||||
log = logging.getLogger("processor")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Database
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def init_db():
|
||||
"""Initialize the SQLite tracking database."""
|
||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.execute(
|
||||
"""CREATE TABLE IF NOT EXISTS processed_files (
|
||||
filename TEXT PRIMARY KEY,
|
||||
processed_at TEXT NOT NULL,
|
||||
original_path TEXT NOT NULL,
|
||||
resized_path TEXT NOT NULL
|
||||
)"""
|
||||
)
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def is_already_processed(conn, filename):
|
||||
"""Check if a file has already been processed."""
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM processed_files WHERE filename = ?", (filename,)
|
||||
).fetchone()
|
||||
return row is not None
|
||||
|
||||
|
||||
def mark_processed(conn, filename, original_path, resized_path):
|
||||
"""Record a file as processed."""
|
||||
conn.execute(
|
||||
"INSERT INTO processed_files (filename, processed_at, original_path, resized_path) "
|
||||
"VALUES (?, ?, ?, ?)",
|
||||
(filename, datetime.now().isoformat(), str(original_path), str(resized_path)),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Processing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def is_jpeg(path):
|
||||
"""Check if a file path has a JPEG extension (case-insensitive)."""
|
||||
return Path(path).suffix.lower() in JPEG_EXTENSIONS
|
||||
|
||||
|
||||
def process_file(filepath, conn):
|
||||
"""Move original and create a resized copy of a JPEG file."""
|
||||
filepath = Path(filepath)
|
||||
|
||||
if not filepath.exists():
|
||||
return
|
||||
|
||||
if not is_jpeg(filepath):
|
||||
return
|
||||
|
||||
filename = filepath.name
|
||||
|
||||
if is_already_processed(conn, filename):
|
||||
log.info("Skipping already processed: %s", filename)
|
||||
return
|
||||
|
||||
try:
|
||||
# Determine year/month from file modification time
|
||||
mtime = filepath.stat().st_mtime
|
||||
dt = datetime.fromtimestamp(mtime)
|
||||
year_month = f"{dt.year}/{dt.month:02d}"
|
||||
|
||||
# Paths
|
||||
original_dest = ORIGINALS_DIR / year_month / filename
|
||||
resized_dest = PROCESSED_DIR / year_month / filename
|
||||
|
||||
# Create directories
|
||||
original_dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
resized_dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Resize and save with EXIF preserved
|
||||
with Image.open(filepath) as image:
|
||||
exif_data = image.info.get("exif", b"")
|
||||
image.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.LANCZOS)
|
||||
|
||||
save_kwargs = {"quality": JPEG_QUALITY}
|
||||
if exif_data:
|
||||
save_kwargs["exif"] = exif_data
|
||||
|
||||
image.save(str(resized_dest), "JPEG", **save_kwargs)
|
||||
|
||||
# Move original out of incoming
|
||||
shutil.move(str(filepath), str(original_dest))
|
||||
|
||||
# Track in database
|
||||
mark_processed(conn, filename, original_dest, resized_dest)
|
||||
log.info("Processed: %s → originals/%s, processed/%s", filename, year_month, year_month)
|
||||
|
||||
except Exception:
|
||||
log.exception("Error processing %s", filename)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Watchdog handler
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class PhotoHandler(FileSystemEventHandler):
|
||||
"""Handles new JPEG files appearing in the incoming directory."""
|
||||
|
||||
def __init__(self, conn):
|
||||
super().__init__()
|
||||
self.conn = conn
|
||||
|
||||
def on_created(self, event):
|
||||
if event.is_directory:
|
||||
return
|
||||
if is_jpeg(event.src_path):
|
||||
log.info("New file detected: %s", event.src_path)
|
||||
# Small delay to ensure file is fully written (FTP uploads)
|
||||
time.sleep(2)
|
||||
process_file(event.src_path, self.conn)
|
||||
|
||||
def on_moved(self, event):
|
||||
"""Handle FTP clients that create temp files then rename."""
|
||||
if event.is_directory:
|
||||
return
|
||||
if is_jpeg(event.dest_path):
|
||||
log.info("Renamed file detected: %s", event.dest_path)
|
||||
time.sleep(2)
|
||||
process_file(event.dest_path, self.conn)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def scan_existing(conn):
|
||||
"""Process any existing files in /incoming/ (catch-up after restart)."""
|
||||
existing = list(INCOMING_DIR.glob("*"))
|
||||
jpegs = [f for f in existing if f.is_file() and is_jpeg(f)]
|
||||
if jpegs:
|
||||
log.info("Found %d existing JPEG(s) in /incoming/, processing...", len(jpegs))
|
||||
for filepath in jpegs:
|
||||
process_file(filepath, conn)
|
||||
else:
|
||||
log.info("No existing files in /incoming/")
|
||||
|
||||
|
||||
def main():
|
||||
log.info("=== Fuji Photo Processor ===")
|
||||
log.info("Config: poll=%ds, quality=%d, max_size=%dx%d",
|
||||
POLL_INTERVAL, JPEG_QUALITY, MAX_WIDTH, MAX_HEIGHT)
|
||||
log.info("Watching: %s", INCOMING_DIR)
|
||||
|
||||
conn = init_db()
|
||||
log.info("Database initialized: %s", DB_PATH)
|
||||
|
||||
# Catch-up scan
|
||||
scan_existing(conn)
|
||||
|
||||
# Start watching
|
||||
handler = PhotoHandler(conn)
|
||||
observer = PollingObserver(timeout=POLL_INTERVAL)
|
||||
observer.schedule(handler, str(INCOMING_DIR), recursive=False)
|
||||
observer.start()
|
||||
log.info("Watching for new files (polling every %ds)...", POLL_INTERVAL)
|
||||
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
log.info("Shutting down...")
|
||||
observer.stop()
|
||||
|
||||
observer.join()
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user