max sec
This commit is contained in:
parent
7f5bef7247
commit
33e002bf1d
373
immich_geotag_from_owntracks_final_2.py
Normal file
373
immich_geotag_from_owntracks_final_2.py
Normal file
|
|
@ -0,0 +1,373 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
immich_geotag_from_owntracks.py
|
||||
|
||||
Geotaguj zdjęcia na podstawie śladu z OwnTracks (JSON/JSONL/NDJSON/.rec) lub pliku GPX
|
||||
z wykorzystaniem exiftool. Idealne do późniejszego importu / reindeksu w Immich.
|
||||
|
||||
Funkcje:
|
||||
- Obsługa JSON Lines / NDJSON oraz „sklejonych” obiektów JSON.
|
||||
- Domyślne przeszukiwanie katalogu: *.json, *.jsonl, *.ndjson, *.rec
|
||||
- **Bezpiecznik:** domyślnie NIE nadpisuje GPS, jeśli już istnieje w EXIF.
|
||||
(użyj --overwrite-existing-gps aby nadpisać)
|
||||
- **--json-max-age-secs:** jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − max_age).
|
||||
|
||||
Wymagania:
|
||||
- Python 3.9+
|
||||
- exiftool (musi być w PATH, np. `apt-get install exiftool`)
|
||||
|
||||
Przykłady:
|
||||
1) Katalog OwnTracks Recorder: owntracks/store/rec (pliki .rec):
|
||||
python3 immich_geotag_from_owntracks.py \
|
||||
--photos /ścieżka/do/zdjęć \
|
||||
--json /ścieżka/do/owntracks/store/rec \
|
||||
--recursive \
|
||||
--write inplace
|
||||
|
||||
2) JSON/JSONL/NDJSON -> GPX -> geotag (bez nadpisywania istniejącego GPS):
|
||||
python3 immich_geotag_from_owntracks.py \
|
||||
--photos /ścieżka/do/zdjęć \
|
||||
--json /ścieżka/do/logów \
|
||||
--interpolate-secs 180 \
|
||||
--time-offset "+00:00" \
|
||||
--json-max-age-secs 86400 \
|
||||
--write inplace
|
||||
|
||||
3) Wymuś nadpisywanie istniejącego GPS:
|
||||
python3 immich_geotag_from_owntracks.py \
|
||||
--photos /ścieżka/do/zdjęć \
|
||||
--json /ścieżka/do/logów \
|
||||
--overwrite-existing-gps \
|
||||
--write inplace
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional, Iterable
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Geotaguj zdjęcia na podstawie OwnTracks JSON/JSONL/NDJSON/.rec lub GPX (dla Immich).")
|
||||
g_input = p.add_argument_group("Wejście śladu")
|
||||
g_input.add_argument("--gpx", type=str, help="Ścieżka do pliku GPX (opcjonalnie zamiast --json).")
|
||||
g_input.add_argument("--json", type=str, help="Ścieżka do pliku/katalogu OwnTracks (JSON/JSONL/NDJSON/.rec).")
|
||||
g_input.add_argument(
|
||||
"--json-glob",
|
||||
type=str,
|
||||
default="*.json,*.jsonl,*.ndjson,*.rec",
|
||||
help="Wzorce plików rozdzielone przecinkami gdy --json wskazuje katalog (domyślnie: *.json,*.jsonl,*.ndjson,*.rec)."
|
||||
)
|
||||
|
||||
g_match = p.add_argument_group("Parametry dopasowania czasu")
|
||||
g_match.add_argument("--interpolate-secs", type=int, default=180, help="Maksymalne okno interpolacji (GeoMaxIntSecs) w sekundach (domyślnie 180).")
|
||||
g_match.add_argument("--time-offset", type=str, default="+00:00",
|
||||
help="Korekta czasu EXIF względem GPX, np. '+00:00', '+01:00', '-00:30'. Przekazywana do exiftool jako -geosync.")
|
||||
g_match.add_argument("--json-max-age-secs", type=int, default=0,
|
||||
help="Jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − ta liczba sekund). 0 lub mniej — wyłączone.")
|
||||
|
||||
g_photos = p.add_argument_group("Zdjęcia")
|
||||
g_photos.add_argument("--photos", type=str, required=True, help="Katalog lub plik ze zdjęciem(i).")
|
||||
g_photos.add_argument("--recursive", action="store_true", help="Przetwarzaj rekursywnie katalog zdjęć (-r dla exiftool).")
|
||||
|
||||
g_out = p.add_argument_group("Zapis")
|
||||
g_out.add_argument("--write", choices=["inplace", "backup"], default="inplace",
|
||||
help="Tryb zapisu: 'inplace' (overwrite_original) lub 'backup' (domyślny tryb exiftool z plikami *_original).")
|
||||
g_out.add_argument("--dry-run", action="store_true", help="Nie zapisuj EXIF, tylko pokaż co byłoby wykonane.")
|
||||
|
||||
g_misc = p.add_argument_group("Różne")
|
||||
g_misc.add_argument("--tmpdir", type=str, help="Katalog tymczasowy na plik GPX (jeśli generowany z JSON).")
|
||||
g_misc.add_argument("--verbose", action="store_true", help="Więcej logów.")
|
||||
g_misc.add_argument("--overwrite-existing-gps", action="store_true",
|
||||
help="Domyślnie nie nadpisuje istniejącego GPS w EXIF. Dodaj ten przełącznik, aby nadpisać.")
|
||||
|
||||
return p.parse_args()
|
||||
|
||||
def _glob_patterns(directory: Path, patterns_csv: str) -> List[Path]:
|
||||
patterns = [p.strip() for p in patterns_csv.split(",") if p.strip()]
|
||||
files: List[Path] = []
|
||||
for patt in patterns:
|
||||
files.extend(directory.rglob(patt))
|
||||
# deduplikacja i sort
|
||||
return sorted(set(files))
|
||||
|
||||
def discover_json_files(path: Path, patterns_csv: str) -> List[Path]:
|
||||
if path.is_file():
|
||||
return [path]
|
||||
elif path.is_dir():
|
||||
return _glob_patterns(path, patterns_csv)
|
||||
else:
|
||||
raise FileNotFoundError(f"Ścieżka z danymi OwnTracks nie istnieje: {path}")
|
||||
|
||||
def _iter_json_objects_from_string(buf: str) -> Iterable[Any]:
|
||||
"""
|
||||
Parsuje wiele obiektów JSON z jednego stringa:
|
||||
- najpierw próbuje json.loads (pojedynczy obiekt lub lista),
|
||||
- jeśli się nie uda, próbuje JSON Lines/NDJSON (po linii),
|
||||
- jeśli dalej nic, używa sekwencyjnego dekodera raw_decode, by wyciągnąć sklejone obiekty.
|
||||
"""
|
||||
# 1) klasyczny JSON
|
||||
try:
|
||||
obj = json.loads(buf)
|
||||
if isinstance(obj, list):
|
||||
for x in obj:
|
||||
yield x
|
||||
return
|
||||
else:
|
||||
yield obj
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2) JSON Lines (NDJSON)
|
||||
parsed_any = False
|
||||
for line in buf.splitlines():
|
||||
s = line.strip()
|
||||
if not s or s.startswith("#"):
|
||||
continue
|
||||
try:
|
||||
yield json.loads(s)
|
||||
parsed_any = True
|
||||
except Exception:
|
||||
# ignoruj linie, które nie są JSON-em; spróbujemy trybu 3
|
||||
pass
|
||||
|
||||
if parsed_any:
|
||||
return
|
||||
|
||||
# 3) Sklejone obiekty JSON
|
||||
dec = json.JSONDecoder()
|
||||
i = 0
|
||||
n = len(buf)
|
||||
while i < n:
|
||||
while i < n and buf[i].isspace():
|
||||
i += 1
|
||||
if i >= n:
|
||||
break
|
||||
try:
|
||||
obj, j = dec.raw_decode(buf, idx=i)
|
||||
yield obj
|
||||
i = j
|
||||
except Exception:
|
||||
i += 1
|
||||
|
||||
def load_owntracks_points(json_files: List[Path], max_age_secs: int, verbose=False) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Wczytuje punkty OwnTracks z plików w formatach:
|
||||
- JSON (obiekt lub lista),
|
||||
- JSON Lines / NDJSON,
|
||||
- wiele sklejonych obiektów JSON,
|
||||
- .rec (jak wyżej — faktycznie JSON/NDJSON).
|
||||
|
||||
Oczekuje, że każdy obiekt ma klucze: lat, lon, tst (epoch sekundy).
|
||||
Zwraca listę posortowaną po czasie (UTC).
|
||||
"""
|
||||
points: List[Dict[str, Any]] = []
|
||||
|
||||
# Granica wieku (jeśli włączona)
|
||||
cutoff: Optional[datetime] = None
|
||||
if max_age_secs and max_age_secs > 0:
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(seconds=max_age_secs)
|
||||
|
||||
for f in json_files:
|
||||
try:
|
||||
raw = f.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
if verbose:
|
||||
print(f"[WARN] Nie udało się odczytać {f}: {e}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
any_in_file = False
|
||||
for entry in _iter_json_objects_from_string(raw):
|
||||
try:
|
||||
lat = float(entry["lat"])
|
||||
lon = float(entry["lon"])
|
||||
tst = int(entry["tst"]) # epoch sekundy
|
||||
if tst <= 0:
|
||||
continue
|
||||
dt = datetime.fromtimestamp(tst, tz=timezone.utc)
|
||||
# filtr wieku (jeśli włączony)
|
||||
if cutoff is not None and dt < cutoff:
|
||||
continue
|
||||
points.append({"lat": lat, "lon": lon, "time": dt})
|
||||
any_in_file = True
|
||||
except Exception:
|
||||
# brak wymaganych pól/nie ten rekord (np. wpisy activity/waypoints)
|
||||
continue
|
||||
|
||||
if verbose and not any_in_file:
|
||||
print(f"[WARN] Brak poprawnych rekordów lat/lon/tst (i wieku) w pliku: {f}", file=sys.stderr)
|
||||
|
||||
points.sort(key=lambda x: x["time"])
|
||||
|
||||
if verbose:
|
||||
if points:
|
||||
print(f"[INFO] Załadowano {len(points)} punktów OwnTracks. Zakres: {points[0]['time']} .. {points[-1]['time']} (UTC)")
|
||||
else:
|
||||
print("[WARN] Nie znaleziono żadnych punktów OwnTracks (lat/lon/tst) po filtrach.")
|
||||
|
||||
return points
|
||||
|
||||
def write_gpx(points: List[Dict[str, Any]], out_path: Path, verbose=False) -> None:
|
||||
def iso8601(dt: datetime) -> str:
|
||||
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
with open(out_path, "w", encoding="utf-8") as gpx:
|
||||
gpx.write('<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||
gpx.write('<gpx version="1.1" creator="immich_geotag_from_owntracks.py" xmlns="http://www.topografix.com/GPX/1/1">\n')
|
||||
gpx.write(' <trk>\n')
|
||||
gpx.write(' <name>OwnTracks Export</name>\n')
|
||||
gpx.write(' <trkseg>\n')
|
||||
for pt in points:
|
||||
gpx.write(f' <trkpt lat="{pt["lat"]:.8f}" lon="{pt["lon"]:.8f}"><time>{iso8601(pt["time"])}</time></trkpt>\n')
|
||||
gpx.write(' </trkseg>\n')
|
||||
gpx.write(' </trk>\n')
|
||||
gpx.write('</gpx>\n')
|
||||
|
||||
if verbose:
|
||||
print(f"[INFO] Zapisano GPX: {out_path} ({len(points)} punktów)")
|
||||
|
||||
def ensure_exiftool_available() -> None:
|
||||
try:
|
||||
subprocess.run(["exiftool", "-ver"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
print("[ERROR] exiftool nie jest dostępny w PATH. Zainstaluj exiftool i spróbuj ponownie.", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
def build_exiftool_cmd(
|
||||
gpx_path: Path,
|
||||
photos_path: Path,
|
||||
interpolate_secs: int,
|
||||
time_offset: str,
|
||||
recursive: bool,
|
||||
write_mode: str,
|
||||
dry_run: bool,
|
||||
skip_existing_gps: bool,
|
||||
) -> List[str]:
|
||||
cmd: List[str] = ["exiftool"]
|
||||
if recursive and photos_path.is_dir():
|
||||
cmd.append("-r")
|
||||
|
||||
# Pomijaj pliki, które już mają GPS
|
||||
if skip_existing_gps:
|
||||
cmd.extend(["-if", "not $GPSLatitude"])
|
||||
|
||||
# dopasowanie w oknie czasowym
|
||||
cmd.extend(["-api", f"GeoMaxIntSecs={interpolate_secs}"])
|
||||
|
||||
# korekta czasu
|
||||
if time_offset:
|
||||
cmd.append(f"-geosync={time_offset}")
|
||||
|
||||
# źródło geotagów
|
||||
cmd.extend(["-geotag", str(gpx_path)])
|
||||
|
||||
# zapis
|
||||
if write_mode == "inplace" and not dry_run:
|
||||
cmd.append("-overwrite_original")
|
||||
|
||||
# cel (ostatni argument)
|
||||
cmd.append(str(photos_path))
|
||||
|
||||
# symulacja
|
||||
if dry_run:
|
||||
cmd.insert(1, "-n") # wartości numeryczne bez formatowania
|
||||
cmd.append("-S") # krótsze wyjście
|
||||
cmd.append("-v2") # szczegóły operacji
|
||||
|
||||
return cmd
|
||||
|
||||
def run_exiftool(cmd: List[str], verbose=False) -> int:
|
||||
if verbose:
|
||||
print("[INFO] Uruchamiam:", " ".join(cmd))
|
||||
try:
|
||||
proc = subprocess.run(cmd, check=False)
|
||||
return proc.returncode
|
||||
except FileNotFoundError:
|
||||
print("[ERROR] exiftool nie znaleziony.", file=sys.stderr)
|
||||
return 127
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
ensure_exiftool_available()
|
||||
|
||||
photos_path = Path(args.photos)
|
||||
if not photos_path.exists():
|
||||
print(f"[ERROR] Ścieżka do zdjęć nie istnieje: {photos_path}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
# 1) Przygotuj GPX
|
||||
gpx_path: Optional[Path] = None
|
||||
temp_dir: Optional[Path] = None
|
||||
|
||||
if args.gpx:
|
||||
gpx_path = Path(args.gpx)
|
||||
if not gpx_path.exists():
|
||||
print(f"[ERROR] Plik GPX nie istnieje: {gpx_path}", file=sys.stderr)
|
||||
return 1
|
||||
if args.verbose:
|
||||
print(f"[INFO] Używam dostarczonego GPX: {gpx_path}")
|
||||
else:
|
||||
if not args.json:
|
||||
print("[ERROR] Musisz podać --gpx lub --json.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
json_path = Path(args.json)
|
||||
try:
|
||||
files = discover_json_files(json_path, args.json_glob)
|
||||
except FileNotFoundError as e:
|
||||
print(f"[ERROR] {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
if args.verbose:
|
||||
print(f"[INFO] Znaleziono {len(files)} plików danych (wzorce: {args.json_glob}).")
|
||||
|
||||
points = load_owntracks_points(files, max_age_secs=args.json_max_age_secs, verbose=args.verbose)
|
||||
if not points:
|
||||
print("[ERROR] Brak punktów z OwnTracks po wczytaniu plików (lat/lon/tst) i filtrach.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
temp_dir = Path(args.tmpdir) if args.tmpdir else Path(tempfile.mkdtemp(prefix="owntracks_gpx_"))
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
gpx_path = temp_dir / "owntracks_export.gpx"
|
||||
write_gpx(points, gpx_path, verbose=args.verbose)
|
||||
|
||||
assert gpx_path is not None
|
||||
|
||||
# 2) Zbuduj i uruchom exiftool
|
||||
skip_existing_gps = not args.overwrite_existing_gps
|
||||
cmd = build_exiftool_cmd(
|
||||
gpx_path=gpx_path,
|
||||
photos_path=photos_path,
|
||||
interpolate_secs=args.interpolate_secs,
|
||||
time_offset=args.time_offset,
|
||||
recursive=args.recursive,
|
||||
write_mode=args.write,
|
||||
dry_run=args.dry_run,
|
||||
skip_existing_gps=skip_existing_gps,
|
||||
)
|
||||
|
||||
rc = run_exiftool(cmd, verbose=args.verbose)
|
||||
if rc != 0:
|
||||
print(f"[ERROR] exiftool zakończył się kodem {rc}.", file=sys.stderr)
|
||||
return rc
|
||||
|
||||
if args.dry_run:
|
||||
print("[OK] Dry-run zakończony. Usuń --dry-run, aby zapisać EXIF.")
|
||||
else:
|
||||
if skip_existing_gps:
|
||||
print("[OK] Geotagowanie zakończone. (Pliki z istniejącym GPS zostały pominięte.)")
|
||||
else:
|
||||
print("[OK] Geotagowanie zakończone. (Istniejący GPS mógł zostać nadpisany.)")
|
||||
|
||||
if temp_dir:
|
||||
print(f"[INFO] Tymczasowy GPX: {gpx_path} (możesz go zachować do archiwum).")
|
||||
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
||||
Loading…
Reference in a new issue