- Add check_same_thread=False to SQLite connection (watchdog callbacks run in a separate thread) - Camera upload dir is /data/photos/incoming (not /incoming) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
223 lines
6.9 KiB
Python
223 lines
6.9 KiB
Python
"""
|
|
Fuji Photo Processor
|
|
Watches /incoming/ for new JPEGs, moves originals and creates resized copies.
|
|
Designed for Fuji X-H2 → FTP → Synology NAS → Immich pipeline.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from PIL import Image
|
|
from watchdog.events import FileSystemEventHandler
|
|
from watchdog.observers.polling import PollingObserver
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration from environment
|
|
# ---------------------------------------------------------------------------
|
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
|
|
JPEG_QUALITY = int(os.environ.get("JPEG_QUALITY", "85"))
|
|
MAX_WIDTH = int(os.environ.get("MAX_WIDTH", "1080"))
|
|
MAX_HEIGHT = int(os.environ.get("MAX_HEIGHT", "1920"))
|
|
|
|
INCOMING_DIR = Path("/incoming")
|
|
ORIGINALS_DIR = Path("/originals")
|
|
PROCESSED_DIR = Path("/processed")
|
|
DB_PATH = Path("/data/processed.db")
|
|
|
|
JPEG_EXTENSIONS = {".jpg", ".jpeg"}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
stream=sys.stdout,
|
|
)
|
|
log = logging.getLogger("processor")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Database
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def init_db():
|
|
"""Initialize the SQLite tracking database."""
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
|
|
conn.execute(
|
|
"""CREATE TABLE IF NOT EXISTS processed_files (
|
|
filename TEXT PRIMARY KEY,
|
|
processed_at TEXT NOT NULL,
|
|
original_path TEXT NOT NULL,
|
|
resized_path TEXT NOT NULL
|
|
)"""
|
|
)
|
|
conn.commit()
|
|
return conn
|
|
|
|
|
|
def is_already_processed(conn, filename):
|
|
"""Check if a file has already been processed."""
|
|
row = conn.execute(
|
|
"SELECT 1 FROM processed_files WHERE filename = ?", (filename,)
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
def mark_processed(conn, filename, original_path, resized_path):
|
|
"""Record a file as processed."""
|
|
conn.execute(
|
|
"INSERT INTO processed_files (filename, processed_at, original_path, resized_path) "
|
|
"VALUES (?, ?, ?, ?)",
|
|
(filename, datetime.now().isoformat(), str(original_path), str(resized_path)),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_jpeg(path):
|
|
"""Check if a file path has a JPEG extension (case-insensitive)."""
|
|
return Path(path).suffix.lower() in JPEG_EXTENSIONS
|
|
|
|
|
|
def process_file(filepath, conn):
|
|
"""Move original and create a resized copy of a JPEG file."""
|
|
filepath = Path(filepath)
|
|
|
|
if not filepath.exists():
|
|
return
|
|
|
|
if not is_jpeg(filepath):
|
|
return
|
|
|
|
filename = filepath.name
|
|
|
|
if is_already_processed(conn, filename):
|
|
log.info("Skipping already processed: %s", filename)
|
|
return
|
|
|
|
try:
|
|
# Determine year/month from file modification time
|
|
mtime = filepath.stat().st_mtime
|
|
dt = datetime.fromtimestamp(mtime)
|
|
year_month = f"{dt.year}/{dt.month:02d}"
|
|
|
|
# Paths
|
|
original_dest = ORIGINALS_DIR / year_month / filename
|
|
resized_dest = PROCESSED_DIR / year_month / filename
|
|
|
|
# Create directories
|
|
original_dest.parent.mkdir(parents=True, exist_ok=True)
|
|
resized_dest.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Resize and save with EXIF preserved
|
|
with Image.open(filepath) as image:
|
|
exif_data = image.info.get("exif", b"")
|
|
image.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.LANCZOS)
|
|
|
|
save_kwargs = {"quality": JPEG_QUALITY}
|
|
if exif_data:
|
|
save_kwargs["exif"] = exif_data
|
|
|
|
image.save(str(resized_dest), "JPEG", **save_kwargs)
|
|
|
|
# Move original out of incoming
|
|
shutil.move(str(filepath), str(original_dest))
|
|
|
|
# Track in database
|
|
mark_processed(conn, filename, original_dest, resized_dest)
|
|
log.info("Processed: %s → originals/%s, processed/%s", filename, year_month, year_month)
|
|
|
|
except Exception:
|
|
log.exception("Error processing %s", filename)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Watchdog handler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class PhotoHandler(FileSystemEventHandler):
|
|
"""Handles new JPEG files appearing in the incoming directory."""
|
|
|
|
def __init__(self, conn):
|
|
super().__init__()
|
|
self.conn = conn
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
return
|
|
if is_jpeg(event.src_path):
|
|
log.info("New file detected: %s", event.src_path)
|
|
# Small delay to ensure file is fully written (FTP uploads)
|
|
time.sleep(2)
|
|
process_file(event.src_path, self.conn)
|
|
|
|
def on_moved(self, event):
|
|
"""Handle FTP clients that create temp files then rename."""
|
|
if event.is_directory:
|
|
return
|
|
if is_jpeg(event.dest_path):
|
|
log.info("Renamed file detected: %s", event.dest_path)
|
|
time.sleep(2)
|
|
process_file(event.dest_path, self.conn)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def scan_existing(conn):
|
|
"""Process any existing files in /incoming/ (catch-up after restart)."""
|
|
existing = list(INCOMING_DIR.glob("*"))
|
|
jpegs = [f for f in existing if f.is_file() and is_jpeg(f)]
|
|
if jpegs:
|
|
log.info("Found %d existing JPEG(s) in /incoming/, processing...", len(jpegs))
|
|
for filepath in jpegs:
|
|
process_file(filepath, conn)
|
|
else:
|
|
log.info("No existing files in /incoming/")
|
|
|
|
|
|
def main():
|
|
log.info("=== Fuji Photo Processor ===")
|
|
log.info("Config: poll=%ds, quality=%d, max_size=%dx%d",
|
|
POLL_INTERVAL, JPEG_QUALITY, MAX_WIDTH, MAX_HEIGHT)
|
|
log.info("Watching: %s", INCOMING_DIR)
|
|
|
|
conn = init_db()
|
|
log.info("Database initialized: %s", DB_PATH)
|
|
|
|
# Catch-up scan
|
|
scan_existing(conn)
|
|
|
|
# Start watching
|
|
handler = PhotoHandler(conn)
|
|
observer = PollingObserver(timeout=POLL_INTERVAL)
|
|
observer.schedule(handler, str(INCOMING_DIR), recursive=False)
|
|
observer.start()
|
|
log.info("Watching for new files (polling every %ds)...", POLL_INTERVAL)
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
log.info("Shutting down...")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|