todo: make final_3 mix od final and final_2
This commit is contained in:
parent
761d1b8ba7
commit
34e01ff4a2
334
immich_geotag_from_owntracks_final_3.py
Normal file
334
immich_geotag_from_owntracks_final_3.py
Normal file
|
|
@ -0,0 +1,334 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
immich_geotag_from_owntracks.py
|
||||||
|
|
||||||
|
Geotaguj zdjęcia na podstawie śladu z OwnTracks (JSON/JSONL/NDJSON/.rec) lub pliku GPX
|
||||||
|
z wykorzystaniem exiftool. Idealne do późniejszego importu / reindeksu w Immich.
|
||||||
|
|
||||||
|
Nowości:
|
||||||
|
- Obsługa JSON Lines / NDJSON oraz „sklejonych” obiektów JSON.
|
||||||
|
- Domyślne przeszukiwanie katalogu: *.json, *.jsonl, *.ndjson, *.rec
|
||||||
|
- **Bezpiecznik:** domyślnie NIE nadpisuje GPS, jeśli już istnieje w EXIF.
|
||||||
|
(użyj --overwrite-existing-gps aby nadpisać)
|
||||||
|
- **--json-max-age-secs:** jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − max_age).
|
||||||
|
|
||||||
|
Wymagania:
|
||||||
|
- Python 3.9+
|
||||||
|
- exiftool (musi być w PATH, np. `apt-get install exiftool`)
|
||||||
|
|
||||||
|
Przykłady:
|
||||||
|
1) Katalog OwnTracks Recorder: owntracks/store/rec (pliki .rec):
|
||||||
|
python3 immich_geotag_from_owntracks.py \
|
||||||
|
--photos /ścieżka/do/zdjęć \
|
||||||
|
--json /ścieżka/do/owntracks/store/rec \
|
||||||
|
--recursive \
|
||||||
|
--write inplace
|
||||||
|
|
||||||
|
2) JSON/JSONL/NDJSON -> GPX -> geotag (bez nadpisywania istniejącego GPS):
|
||||||
|
python3 immich_geotag_from_owntracks.py \
|
||||||
|
--photos /ścieżka/do/zdjęć \
|
||||||
|
--json /ścieżka/do/logów \
|
||||||
|
--interpolate-secs 180 \
|
||||||
|
--time-offset "+00:00" \
|
||||||
|
--json-max-age-secs 86400 \
|
||||||
|
--write inplace
|
||||||
|
|
||||||
|
3) Wymuś nadpisywanie istniejącego GPS:
|
||||||
|
python3 immich_geotag_from_owntracks.py \
|
||||||
|
--photos /ścieżka/do/zdjęć \
|
||||||
|
--json /ścieżka/do/logów \
|
||||||
|
--overwrite-existing-gps \
|
||||||
|
--write inplace
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Dict, Any, Optional, Iterable
|
||||||
|
|
||||||
|
def parse_args() -> argparse.Namespace:
|
||||||
|
p = argparse.ArgumentParser(description="Geotaguj zdjęcia na podstawie OwnTracks JSON/JSONL/NDJSON/.rec lub GPX (dla Immich).")
|
||||||
|
g_input = p.add_argument_group("Wejście śladu")
|
||||||
|
g_input.add_argument("--gpx", type=str, help="Ścieżka do pliku GPX (opcjonalnie zamiast --json).")
|
||||||
|
g_input.add_argument("--json", type=str, help="Ścieżka do pliku/katalogu OwnTracks (JSON/JSONL/NDJSON/.rec).")
|
||||||
|
g_input.add_argument(
|
||||||
|
"--json-glob",
|
||||||
|
type=str,
|
||||||
|
default="*.json,*.jsonl,*.ndjson,*.rec",
|
||||||
|
help="Wzorce plików rozdzielone przecinkami gdy --json wskazuje katalog (domyślnie: *.json,*.jsonl,*.ndjson,*.rec)."
|
||||||
|
)
|
||||||
|
|
||||||
|
g_match = p.add_argument_group("Parametry dopasowania czasu")
|
||||||
|
g_match.add_argument("--interpolate-secs", type=int, default=180, help="Maksymalne okno interpolacji (GeoMaxIntSecs) w sekundach (domyślnie 180).")
|
||||||
|
g_match.add_argument("--time-offset", type=str, default="+00:00",
|
||||||
|
help="Korekta czasu EXIF względem GPX, np. '+00:00', '+01:00', '-00:30'. Przekazywana do exiftool jako -geosync.")
|
||||||
|
g_match.add_argument("--json-max-age-secs", type=int, default=0,
|
||||||
|
help="Jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − ta liczba sekund). 0 lub mniej — wyłączone.")
|
||||||
|
|
||||||
|
g_photos = p.add_argument_group("Zdjęcia")
|
||||||
|
g_photos.add_argument("--photos", type=str, required=True, help="Katalog lub plik ze zdjęciem(i).")
|
||||||
|
g_photos.add_argument("--recursive", action="store_true", help="Przetwarzaj rekursywnie katalog zdjęć (-r dla exiftool).")
|
||||||
|
|
||||||
|
g_out = p.add_argument_group("Zapis")
|
||||||
|
g_out.add_argument("--write", choices=["inplace", "backup"], default="inplace",
|
||||||
|
help="Tryb zapisu: 'inplace' (overwrite_original) lub 'backup' (domyślny tryb exiftool z plikami *_original).")
|
||||||
|
g_out.add_argument("--dry-run", action="store_true", help="Nie zapisuj EXIF, tylko pokaż co byłoby wykonane.")
|
||||||
|
|
||||||
|
g_misc = p.add_argument_group("Różne")
|
||||||
|
g_misc.add_argument("--tmpdir", type=str, help="Katalog tymczasowy na plik GPX (jeśli generowany z JSON).")
|
||||||
|
g_misc.add_argument("--verbose", action="store_true", help="Więcej logów.")
|
||||||
|
g_misc.add_argument("--overwrite-existing-gps", action="store_true",
|
||||||
|
help="Domyślnie nie nadpisuje istniejącego GPS w EXIF. Dodaj ten przełącznik, aby nadpisać.")
|
||||||
|
|
||||||
|
return p.parse_args()
|
||||||
|
|
||||||
|
def _glob_patterns(directory: Path, patterns_csv: str) -> List[Path]:
|
||||||
|
patterns = [p.strip() for p in patterns_csv.split(",") if p.strip()]
|
||||||
|
files: List[Path] = []
|
||||||
|
for patt in patterns:
|
||||||
|
files.extend(directory.rglob(patt))
|
||||||
|
# deduplikacja i sort
|
||||||
|
return sorted(set(files))
|
||||||
|
|
||||||
|
def discover_json_files(path: Path, patterns_csv: str) -> List[Path]:
|
||||||
|
if path.is_file():
|
||||||
|
return [path]
|
||||||
|
elif path.is_dir():
|
||||||
|
return _glob_patterns(path, patterns_csv)
|
||||||
|
else:
|
||||||
|
raise FileNotFoundError(f"Ścieżka z danymi OwnTracks nie istnieje: {path}")
|
||||||
|
|
||||||
|
def _iter_json_objects_from_string(buf: str) -> Iterable[Any]:
|
||||||
|
"""Próbuje: klasyczny JSON -> JSONL/NDJSON linia po linii -> sekwencyjne raw_decode."""
|
||||||
|
# 1) klasyczny JSON
|
||||||
|
try:
|
||||||
|
obj = json.loads(buf)
|
||||||
|
if isinstance(obj, list):
|
||||||
|
for x in obj:
|
||||||
|
yield x
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
yield obj
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# 2) JSON Lines / NDJSON
|
||||||
|
parsed_any = False
|
||||||
|
for line in buf.splitlines():
|
||||||
|
s = line.strip()
|
||||||
|
if not s or s.startswith("#"):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
yield json.loads(s)
|
||||||
|
parsed_any = True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
if parsed_any:
|
||||||
|
return
|
||||||
|
# 3) Wiele sklejonych obiektów JSON
|
||||||
|
dec = json.JSONDecoder()
|
||||||
|
i = 0
|
||||||
|
n = len(buf)
|
||||||
|
while i < n:
|
||||||
|
while i < n and buf[i].isspace():
|
||||||
|
i += 1
|
||||||
|
if i >= n:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
obj, j = dec.raw_decode(buf, idx=i)
|
||||||
|
yield obj
|
||||||
|
i = j
|
||||||
|
except Exception:
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
def load_owntracks_points(json_files: List[Path], verbose=False) -> List[Dict[str, Any]]:
|
||||||
|
"""Czyta punkty OwnTracks z wielu wariantów plików. Wymaga lat/lon/tst."""
|
||||||
|
points: List[Dict[str, Any]] = []
|
||||||
|
for f in json_files:
|
||||||
|
try:
|
||||||
|
raw = f.read_text(encoding="utf-8")
|
||||||
|
except Exception as e:
|
||||||
|
if verbose:
|
||||||
|
print(f"[WARN] Nie udało się odczytać {f}: {e}", file=sys.stderr)
|
||||||
|
continue
|
||||||
|
any_in_file = False
|
||||||
|
for entry in _iter_json_objects_from_string(raw):
|
||||||
|
try:
|
||||||
|
lat = float(entry["lat"])
|
||||||
|
lon = float(entry["lon"])
|
||||||
|
tst = int(entry["tst"]) # epoch sekundy
|
||||||
|
if tst <= 0:
|
||||||
|
continue
|
||||||
|
dt = datetime.fromtimestamp(tst, tz=timezone.utc)
|
||||||
|
points.append({"lat": lat, "lon": lon, "time": dt})
|
||||||
|
any_in_file = True
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if verbose and not any_in_file:
|
||||||
|
print(f"[WARN] Brak poprawnych rekordów lat/lon/tst w pliku: {f}", file=sys.stderr)
|
||||||
|
points.sort(key=lambda x: x["time"])
|
||||||
|
if verbose:
|
||||||
|
if points:
|
||||||
|
print(f"[INFO] Załadowano {len(points)} punktów OwnTracks. Zakres: {points[0]['time']} .. {points[-1]['time']} (UTC)")
|
||||||
|
else:
|
||||||
|
print("[WARN] Nie znaleziono żadnych punktów OwnTracks (lat/lon/tst).")
|
||||||
|
return points
|
||||||
|
|
||||||
|
def write_gpx(points: List[Dict[str, Any]], out_path: Path, verbose=False) -> None:
|
||||||
|
def iso8601(dt: datetime) -> str:
|
||||||
|
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
with open(out_path, "w", encoding="utf-8") as gpx:
|
||||||
|
gpx.write('<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||||
|
gpx.write('<gpx version="1.1" creator="immich_geotag_from_owntracks.py" xmlns="http://www.topografix.com/GPX/1/1">\n')
|
||||||
|
gpx.write(' <trk>\n')
|
||||||
|
gpx.write(' <name>OwnTracks Export</name>\n')
|
||||||
|
gpx.write(' <trkseg>\n')
|
||||||
|
for pt in points:
|
||||||
|
gpx.write(f' <trkpt lat="{pt["lat"]:.8f}" lon="{pt["lon"]:.8f}"><time>{iso8601(pt["time"])}</time></trkpt>\n')
|
||||||
|
gpx.write(' </trkseg>\n')
|
||||||
|
gpx.write(' </trk>\n')
|
||||||
|
gpx.write('</gpx>\n')
|
||||||
|
if verbose:
|
||||||
|
print(f"[INFO] Zapisano GPX: {out_path} ({len(points)} punktów)")
|
||||||
|
|
||||||
|
def ensure_exiftool_available() -> None:
|
||||||
|
try:
|
||||||
|
subprocess.run(["exiftool", "-ver"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||||
|
print("[ERROR] exiftool nie jest dostępny w PATH. Zainstaluj exiftool i spróbuj ponownie.", file=sys.stderr)
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
def build_exiftool_cmd(
|
||||||
|
gpx_path: Path,
|
||||||
|
photos_path: Path,
|
||||||
|
interpolate_secs: int,
|
||||||
|
time_offset: str,
|
||||||
|
recursive: bool,
|
||||||
|
write_mode: str,
|
||||||
|
dry_run: bool,
|
||||||
|
skip_existing_gps: bool,
|
||||||
|
) -> List[str]:
|
||||||
|
cmd: List[str] = ["exiftool"]
|
||||||
|
if recursive and photos_path.is_dir():
|
||||||
|
cmd.append("-r")
|
||||||
|
|
||||||
|
# Pomijaj pliki, które już mają GPS
|
||||||
|
if skip_existing_gps:
|
||||||
|
cmd.extend(["-if", "not $GPSLatitude"])
|
||||||
|
|
||||||
|
# dopasowanie w oknie czasowym
|
||||||
|
cmd.extend(["-api", f"GeoMaxIntSecs={interpolate_secs}"])
|
||||||
|
|
||||||
|
# korekta czasu
|
||||||
|
if time_offset:
|
||||||
|
cmd.append(f"-geosync={time_offset}")
|
||||||
|
|
||||||
|
# źródło geotagów
|
||||||
|
cmd.extend(["-geotag", str(gpx_path)])
|
||||||
|
|
||||||
|
# zapis
|
||||||
|
if write_mode == "inplace" and not dry_run:
|
||||||
|
cmd.append("-overwrite_original")
|
||||||
|
|
||||||
|
# cel (ostatni argument)
|
||||||
|
cmd.append(str(photos_path))
|
||||||
|
|
||||||
|
# symulacja
|
||||||
|
if dry_run:
|
||||||
|
cmd.insert(1, "-n") # wartości numeryczne bez formatowania
|
||||||
|
cmd.append("-S") # krótsze wyjście
|
||||||
|
cmd.append("-v2") # szczegóły operacji
|
||||||
|
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def run_exiftool(cmd: List[str], verbose=False) -> int:
|
||||||
|
if verbose:
|
||||||
|
print("[INFO] Uruchamiam:", " ".join(cmd))
|
||||||
|
try:
|
||||||
|
proc = subprocess.run(cmd, check=False)
|
||||||
|
return proc.returncode
|
||||||
|
except FileNotFoundError:
|
||||||
|
print("[ERROR] exiftool nie znaleziony.", file=sys.stderr)
|
||||||
|
return 127
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
args = parse_args()
|
||||||
|
ensure_exiftool_available()
|
||||||
|
|
||||||
|
photos_path = Path(args.photos)
|
||||||
|
if not photos_path.exists():
|
||||||
|
print(f"[ERROR] Ścieżka do zdjęć nie istnieje: {photos_path}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# 1) Przygotuj GPX
|
||||||
|
gpx_path: Optional[Path] = None
|
||||||
|
temp_dir: Optional[Path] = None
|
||||||
|
|
||||||
|
if args.gpx:
|
||||||
|
gpx_path = Path(args.gpx)
|
||||||
|
if not gpx_path.exists():
|
||||||
|
print(f"[ERROR] Plik GPX nie istnieje: {gpx_path}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
if args.verbose:
|
||||||
|
print(f"[INFO] Używam dostarczonego GPX: {gpx_path}")
|
||||||
|
else:
|
||||||
|
if not args.json:
|
||||||
|
print("[ERROR] Musisz podać --gpx lub --json.", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
json_path = Path(args.json)
|
||||||
|
try:
|
||||||
|
files = discover_json_files(json_path, args.json_glob)
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
print(f"[ERROR] {e}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
if args.verbose:
|
||||||
|
print(f"[INFO] Znaleziono {len(files)} plików danych (wzorce: {args.json_glob}).")
|
||||||
|
|
||||||
|
points = load_owntracks_points(files, verbose=args.verbose)
|
||||||
|
if not points:
|
||||||
|
print("[ERROR] Brak punktów z OwnTracks po wczytaniu plików (lat/lon/tst nie znaleziono).", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
temp_dir = Path(args.tmpdir) if args.tmpdir else Path(tempfile.mkdtemp(prefix="owntracks_gpx_"))
|
||||||
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
gpx_path = temp_dir / "owntracks_export.gpx"
|
||||||
|
write_gpx(points, gpx_path, verbose=args.verbose)
|
||||||
|
|
||||||
|
assert gpx_path is not None
|
||||||
|
|
||||||
|
# 2) Zbuduj i uruchom exiftool
|
||||||
|
skip_existing_gps = not args.overwrite_existing_gps
|
||||||
|
cmd = build_exiftool_cmd(
|
||||||
|
gpx_path=gpx_path,
|
||||||
|
photos_path=photos_path,
|
||||||
|
interpolate_secs=args.interpolate_secs,
|
||||||
|
time_offset=args.time_offset,
|
||||||
|
recursive=args.recursive,
|
||||||
|
write_mode=args.write,
|
||||||
|
dry_run=args.dry_run,
|
||||||
|
skip_existing_gps=skip_existing_gps,
|
||||||
|
)
|
||||||
|
|
||||||
|
rc = run_exiftool(cmd, verbose=args.verbose)
|
||||||
|
if rc != 0:
|
||||||
|
print(f"[ERROR] exiftool zakończył się kodem {rc}.", file=sys.stderr)
|
||||||
|
return rc
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
print("[OK] Dry-run zakończony. Wygląda dobrze. Usuń --dry-run, aby zapisać EXIF.")
|
||||||
|
else:
|
||||||
|
print("[OK] Geotagowanie zakończone sukcesem. (Pliki z istniejącym GPS zostały pominięte.)")
|
||||||
|
|
||||||
|
if temp_dir:
|
||||||
|
print(f"[INFO] Tymczasowy GPX: {gpx_path} (możesz go zachować do archiwum).")
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Loading…
Reference in a new issue