fixes
This commit is contained in:
parent
34e01ff4a2
commit
7925de86c6
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
set -x
|
set -x
|
||||||
|
|
||||||
echo running immich_geotag_from_owntracks_final_2.py
|
echo running immich_geotag_from_owntracks_final.py
|
||||||
|
|
||||||
photoDir="$1"
|
photoDir="$1"
|
||||||
|
|
||||||
|
|
@ -11,5 +11,6 @@ python3 immich_geotag_from_owntracks_final_2.py \
|
||||||
--json /home/pi/own-tracks/owntracks/store/rec/owntracksusr/oskar \
|
--json /home/pi/own-tracks/owntracks/store/rec/owntracksusr/oskar \
|
||||||
--json-glob "*.rec" \
|
--json-glob "*.rec" \
|
||||||
--recursive --dry-run --verbose \
|
--recursive --dry-run --verbose \
|
||||||
--json-max-age-secs 3600 \
|
--interpolate-secs 43200 \
|
||||||
|
--extrapolate-secs 43200 \
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,373 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
immich_geotag_from_owntracks.py
|
|
||||||
|
|
||||||
Geotaguj zdjęcia na podstawie śladu z OwnTracks (JSON/JSONL/NDJSON/.rec) lub pliku GPX
|
|
||||||
z wykorzystaniem exiftool. Idealne do późniejszego importu / reindeksu w Immich.
|
|
||||||
|
|
||||||
Funkcje:
|
|
||||||
- Obsługa JSON Lines / NDJSON oraz „sklejonych” obiektów JSON.
|
|
||||||
- Domyślne przeszukiwanie katalogu: *.json, *.jsonl, *.ndjson, *.rec
|
|
||||||
- **Bezpiecznik:** domyślnie NIE nadpisuje GPS, jeśli już istnieje w EXIF.
|
|
||||||
(użyj --overwrite-existing-gps aby nadpisać)
|
|
||||||
- **--json-max-age-secs:** jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − max_age).
|
|
||||||
|
|
||||||
Wymagania:
|
|
||||||
- Python 3.9+
|
|
||||||
- exiftool (musi być w PATH, np. `apt-get install exiftool`)
|
|
||||||
|
|
||||||
Przykłady:
|
|
||||||
1) Katalog OwnTracks Recorder: owntracks/store/rec (pliki .rec):
|
|
||||||
python3 immich_geotag_from_owntracks.py \
|
|
||||||
--photos /ścieżka/do/zdjęć \
|
|
||||||
--json /ścieżka/do/owntracks/store/rec \
|
|
||||||
--recursive \
|
|
||||||
--write inplace
|
|
||||||
|
|
||||||
2) JSON/JSONL/NDJSON -> GPX -> geotag (bez nadpisywania istniejącego GPS):
|
|
||||||
python3 immich_geotag_from_owntracks.py \
|
|
||||||
--photos /ścieżka/do/zdjęć \
|
|
||||||
--json /ścieżka/do/logów \
|
|
||||||
--interpolate-secs 180 \
|
|
||||||
--time-offset "+00:00" \
|
|
||||||
--json-max-age-secs 86400 \
|
|
||||||
--write inplace
|
|
||||||
|
|
||||||
3) Wymuś nadpisywanie istniejącego GPS:
|
|
||||||
python3 immich_geotag_from_owntracks.py \
|
|
||||||
--photos /ścieżka/do/zdjęć \
|
|
||||||
--json /ścieżka/do/logów \
|
|
||||||
--overwrite-existing-gps \
|
|
||||||
--write inplace
|
|
||||||
"""
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import json
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
from datetime import datetime, timezone, timedelta
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import List, Dict, Any, Optional, Iterable
|
|
||||||
|
|
||||||
def parse_args() -> argparse.Namespace:
|
|
||||||
p = argparse.ArgumentParser(description="Geotaguj zdjęcia na podstawie OwnTracks JSON/JSONL/NDJSON/.rec lub GPX (dla Immich).")
|
|
||||||
g_input = p.add_argument_group("Wejście śladu")
|
|
||||||
g_input.add_argument("--gpx", type=str, help="Ścieżka do pliku GPX (opcjonalnie zamiast --json).")
|
|
||||||
g_input.add_argument("--json", type=str, help="Ścieżka do pliku/katalogu OwnTracks (JSON/JSONL/NDJSON/.rec).")
|
|
||||||
g_input.add_argument(
|
|
||||||
"--json-glob",
|
|
||||||
type=str,
|
|
||||||
default="*.json,*.jsonl,*.ndjson,*.rec",
|
|
||||||
help="Wzorce plików rozdzielone przecinkami gdy --json wskazuje katalog (domyślnie: *.json,*.jsonl,*.ndjson,*.rec)."
|
|
||||||
)
|
|
||||||
|
|
||||||
g_match = p.add_argument_group("Parametry dopasowania czasu")
|
|
||||||
g_match.add_argument("--interpolate-secs", type=int, default=180, help="Maksymalne okno interpolacji (GeoMaxIntSecs) w sekundach (domyślnie 180).")
|
|
||||||
g_match.add_argument("--time-offset", type=str, default="+00:00",
|
|
||||||
help="Korekta czasu EXIF względem GPX, np. '+00:00', '+01:00', '-00:30'. Przekazywana do exiftool jako -geosync.")
|
|
||||||
g_match.add_argument("--json-max-age-secs", type=int, default=0,
|
|
||||||
help="Jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − ta liczba sekund). 0 lub mniej — wyłączone.")
|
|
||||||
|
|
||||||
g_photos = p.add_argument_group("Zdjęcia")
|
|
||||||
g_photos.add_argument("--photos", type=str, required=True, help="Katalog lub plik ze zdjęciem(i).")
|
|
||||||
g_photos.add_argument("--recursive", action="store_true", help="Przetwarzaj rekursywnie katalog zdjęć (-r dla exiftool).")
|
|
||||||
|
|
||||||
g_out = p.add_argument_group("Zapis")
|
|
||||||
g_out.add_argument("--write", choices=["inplace", "backup"], default="inplace",
|
|
||||||
help="Tryb zapisu: 'inplace' (overwrite_original) lub 'backup' (domyślny tryb exiftool z plikami *_original).")
|
|
||||||
g_out.add_argument("--dry-run", action="store_true", help="Nie zapisuj EXIF, tylko pokaż co byłoby wykonane.")
|
|
||||||
|
|
||||||
g_misc = p.add_argument_group("Różne")
|
|
||||||
g_misc.add_argument("--tmpdir", type=str, help="Katalog tymczasowy na plik GPX (jeśli generowany z JSON).")
|
|
||||||
g_misc.add_argument("--verbose", action="store_true", help="Więcej logów.")
|
|
||||||
g_misc.add_argument("--overwrite-existing-gps", action="store_true",
|
|
||||||
help="Domyślnie nie nadpisuje istniejącego GPS w EXIF. Dodaj ten przełącznik, aby nadpisać.")
|
|
||||||
|
|
||||||
return p.parse_args()
|
|
||||||
|
|
||||||
def _glob_patterns(directory: Path, patterns_csv: str) -> List[Path]:
|
|
||||||
patterns = [p.strip() for p in patterns_csv.split(",") if p.strip()]
|
|
||||||
files: List[Path] = []
|
|
||||||
for patt in patterns:
|
|
||||||
files.extend(directory.rglob(patt))
|
|
||||||
# deduplikacja i sort
|
|
||||||
return sorted(set(files))
|
|
||||||
|
|
||||||
def discover_json_files(path: Path, patterns_csv: str) -> List[Path]:
|
|
||||||
if path.is_file():
|
|
||||||
return [path]
|
|
||||||
elif path.is_dir():
|
|
||||||
return _glob_patterns(path, patterns_csv)
|
|
||||||
else:
|
|
||||||
raise FileNotFoundError(f"Ścieżka z danymi OwnTracks nie istnieje: {path}")
|
|
||||||
|
|
||||||
def _iter_json_objects_from_string(buf: str) -> Iterable[Any]:
|
|
||||||
"""
|
|
||||||
Parsuje wiele obiektów JSON z jednego stringa:
|
|
||||||
- najpierw próbuje json.loads (pojedynczy obiekt lub lista),
|
|
||||||
- jeśli się nie uda, próbuje JSON Lines/NDJSON (po linii),
|
|
||||||
- jeśli dalej nic, używa sekwencyjnego dekodera raw_decode, by wyciągnąć sklejone obiekty.
|
|
||||||
"""
|
|
||||||
# 1) klasyczny JSON
|
|
||||||
try:
|
|
||||||
obj = json.loads(buf)
|
|
||||||
if isinstance(obj, list):
|
|
||||||
for x in obj:
|
|
||||||
yield x
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
yield obj
|
|
||||||
return
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# 2) JSON Lines (NDJSON)
|
|
||||||
parsed_any = False
|
|
||||||
for line in buf.splitlines():
|
|
||||||
s = line.strip()
|
|
||||||
if not s or s.startswith("#"):
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
yield json.loads(s)
|
|
||||||
parsed_any = True
|
|
||||||
except Exception:
|
|
||||||
# ignoruj linie, które nie są JSON-em; spróbujemy trybu 3
|
|
||||||
pass
|
|
||||||
|
|
||||||
if parsed_any:
|
|
||||||
return
|
|
||||||
|
|
||||||
# 3) Sklejone obiekty JSON
|
|
||||||
dec = json.JSONDecoder()
|
|
||||||
i = 0
|
|
||||||
n = len(buf)
|
|
||||||
while i < n:
|
|
||||||
while i < n and buf[i].isspace():
|
|
||||||
i += 1
|
|
||||||
if i >= n:
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
obj, j = dec.raw_decode(buf, idx=i)
|
|
||||||
yield obj
|
|
||||||
i = j
|
|
||||||
except Exception:
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
def load_owntracks_points(json_files: List[Path], max_age_secs: int, verbose=False) -> List[Dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Wczytuje punkty OwnTracks z plików w formatach:
|
|
||||||
- JSON (obiekt lub lista),
|
|
||||||
- JSON Lines / NDJSON,
|
|
||||||
- wiele sklejonych obiektów JSON,
|
|
||||||
- .rec (jak wyżej — faktycznie JSON/NDJSON).
|
|
||||||
|
|
||||||
Oczekuje, że każdy obiekt ma klucze: lat, lon, tst (epoch sekundy).
|
|
||||||
Zwraca listę posortowaną po czasie (UTC).
|
|
||||||
"""
|
|
||||||
points: List[Dict[str, Any]] = []
|
|
||||||
|
|
||||||
# Granica wieku (jeśli włączona)
|
|
||||||
cutoff: Optional[datetime] = None
|
|
||||||
if max_age_secs and max_age_secs > 0:
|
|
||||||
cutoff = datetime.now(timezone.utc) - timedelta(seconds=max_age_secs)
|
|
||||||
|
|
||||||
for f in json_files:
|
|
||||||
try:
|
|
||||||
raw = f.read_text(encoding="utf-8")
|
|
||||||
except Exception as e:
|
|
||||||
if verbose:
|
|
||||||
print(f"[WARN] Nie udało się odczytać {f}: {e}", file=sys.stderr)
|
|
||||||
continue
|
|
||||||
|
|
||||||
any_in_file = False
|
|
||||||
for entry in _iter_json_objects_from_string(raw):
|
|
||||||
try:
|
|
||||||
lat = float(entry["lat"])
|
|
||||||
lon = float(entry["lon"])
|
|
||||||
tst = int(entry["tst"]) # epoch sekundy
|
|
||||||
if tst <= 0:
|
|
||||||
continue
|
|
||||||
dt = datetime.fromtimestamp(tst, tz=timezone.utc)
|
|
||||||
# filtr wieku (jeśli włączony)
|
|
||||||
if cutoff is not None and dt < cutoff:
|
|
||||||
continue
|
|
||||||
points.append({"lat": lat, "lon": lon, "time": dt})
|
|
||||||
any_in_file = True
|
|
||||||
except Exception:
|
|
||||||
# brak wymaganych pól/nie ten rekord (np. wpisy activity/waypoints)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if verbose and not any_in_file:
|
|
||||||
print(f"[WARN] Brak poprawnych rekordów lat/lon/tst (i wieku) w pliku: {f}", file=sys.stderr)
|
|
||||||
|
|
||||||
points.sort(key=lambda x: x["time"])
|
|
||||||
|
|
||||||
if verbose:
|
|
||||||
if points:
|
|
||||||
print(f"[INFO] Załadowano {len(points)} punktów OwnTracks. Zakres: {points[0]['time']} .. {points[-1]['time']} (UTC)")
|
|
||||||
else:
|
|
||||||
print("[WARN] Nie znaleziono żadnych punktów OwnTracks (lat/lon/tst) po filtrach.")
|
|
||||||
|
|
||||||
return points
|
|
||||||
|
|
||||||
def write_gpx(points: List[Dict[str, Any]], out_path: Path, verbose=False) -> None:
|
|
||||||
def iso8601(dt: datetime) -> str:
|
|
||||||
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
||||||
|
|
||||||
with open(out_path, "w", encoding="utf-8") as gpx:
|
|
||||||
gpx.write('<?xml version="1.0" encoding="UTF-8"?>\n')
|
|
||||||
gpx.write('<gpx version="1.1" creator="immich_geotag_from_owntracks.py" xmlns="http://www.topografix.com/GPX/1/1">\n')
|
|
||||||
gpx.write(' <trk>\n')
|
|
||||||
gpx.write(' <name>OwnTracks Export</name>\n')
|
|
||||||
gpx.write(' <trkseg>\n')
|
|
||||||
for pt in points:
|
|
||||||
gpx.write(f' <trkpt lat="{pt["lat"]:.8f}" lon="{pt["lon"]:.8f}"><time>{iso8601(pt["time"])}</time></trkpt>\n')
|
|
||||||
gpx.write(' </trkseg>\n')
|
|
||||||
gpx.write(' </trk>\n')
|
|
||||||
gpx.write('</gpx>\n')
|
|
||||||
|
|
||||||
if verbose:
|
|
||||||
print(f"[INFO] Zapisano GPX: {out_path} ({len(points)} punktów)")
|
|
||||||
|
|
||||||
def ensure_exiftool_available() -> None:
|
|
||||||
try:
|
|
||||||
subprocess.run(["exiftool", "-ver"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
||||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
||||||
print("[ERROR] exiftool nie jest dostępny w PATH. Zainstaluj exiftool i spróbuj ponownie.", file=sys.stderr)
|
|
||||||
sys.exit(2)
|
|
||||||
|
|
||||||
def build_exiftool_cmd(
|
|
||||||
gpx_path: Path,
|
|
||||||
photos_path: Path,
|
|
||||||
interpolate_secs: int,
|
|
||||||
time_offset: str,
|
|
||||||
recursive: bool,
|
|
||||||
write_mode: str,
|
|
||||||
dry_run: bool,
|
|
||||||
skip_existing_gps: bool,
|
|
||||||
) -> List[str]:
|
|
||||||
cmd: List[str] = ["exiftool"]
|
|
||||||
if recursive and photos_path.is_dir():
|
|
||||||
cmd.append("-r")
|
|
||||||
|
|
||||||
# Pomijaj pliki, które już mają GPS
|
|
||||||
if skip_existing_gps:
|
|
||||||
cmd.extend(["-if", "not $GPSLatitude"])
|
|
||||||
|
|
||||||
# dopasowanie w oknie czasowym
|
|
||||||
cmd.extend(["-api", f"GeoMaxIntSecs={interpolate_secs}"])
|
|
||||||
|
|
||||||
# korekta czasu
|
|
||||||
if time_offset:
|
|
||||||
cmd.append(f"-geosync={time_offset}")
|
|
||||||
|
|
||||||
# źródło geotagów
|
|
||||||
cmd.extend(["-geotag", str(gpx_path)])
|
|
||||||
|
|
||||||
# zapis
|
|
||||||
if write_mode == "inplace" and not dry_run:
|
|
||||||
cmd.append("-overwrite_original")
|
|
||||||
|
|
||||||
# cel (ostatni argument)
|
|
||||||
cmd.append(str(photos_path))
|
|
||||||
|
|
||||||
# symulacja
|
|
||||||
if dry_run:
|
|
||||||
cmd.insert(1, "-n") # wartości numeryczne bez formatowania
|
|
||||||
cmd.append("-S") # krótsze wyjście
|
|
||||||
cmd.append("-v2") # szczegóły operacji
|
|
||||||
|
|
||||||
return cmd
|
|
||||||
|
|
||||||
def run_exiftool(cmd: List[str], verbose=False) -> int:
|
|
||||||
if verbose:
|
|
||||||
print("[INFO] Uruchamiam:", " ".join(cmd))
|
|
||||||
try:
|
|
||||||
proc = subprocess.run(cmd, check=False)
|
|
||||||
return proc.returncode
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("[ERROR] exiftool nie znaleziony.", file=sys.stderr)
|
|
||||||
return 127
|
|
||||||
|
|
||||||
def main() -> int:
|
|
||||||
args = parse_args()
|
|
||||||
ensure_exiftool_available()
|
|
||||||
|
|
||||||
photos_path = Path(args.photos)
|
|
||||||
if not photos_path.exists():
|
|
||||||
print(f"[ERROR] Ścieżka do zdjęć nie istnieje: {photos_path}", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
# 1) Przygotuj GPX
|
|
||||||
gpx_path: Optional[Path] = None
|
|
||||||
temp_dir: Optional[Path] = None
|
|
||||||
|
|
||||||
if args.gpx:
|
|
||||||
gpx_path = Path(args.gpx)
|
|
||||||
if not gpx_path.exists():
|
|
||||||
print(f"[ERROR] Plik GPX nie istnieje: {gpx_path}", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
if args.verbose:
|
|
||||||
print(f"[INFO] Używam dostarczonego GPX: {gpx_path}")
|
|
||||||
else:
|
|
||||||
if not args.json:
|
|
||||||
print("[ERROR] Musisz podać --gpx lub --json.", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
json_path = Path(args.json)
|
|
||||||
try:
|
|
||||||
files = discover_json_files(json_path, args.json_glob)
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
print(f"[ERROR] {e}", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
if args.verbose:
|
|
||||||
print(f"[INFO] Znaleziono {len(files)} plików danych (wzorce: {args.json_glob}).")
|
|
||||||
|
|
||||||
points = load_owntracks_points(files, max_age_secs=args.json_max_age_secs, verbose=args.verbose)
|
|
||||||
if not points:
|
|
||||||
print("[ERROR] Brak punktów z OwnTracks po wczytaniu plików (lat/lon/tst) i filtrach.", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
temp_dir = Path(args.tmpdir) if args.tmpdir else Path(tempfile.mkdtemp(prefix="owntracks_gpx_"))
|
|
||||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
gpx_path = temp_dir / "owntracks_export.gpx"
|
|
||||||
write_gpx(points, gpx_path, verbose=args.verbose)
|
|
||||||
|
|
||||||
assert gpx_path is not None
|
|
||||||
|
|
||||||
# 2) Zbuduj i uruchom exiftool
|
|
||||||
skip_existing_gps = not args.overwrite_existing_gps
|
|
||||||
cmd = build_exiftool_cmd(
|
|
||||||
gpx_path=gpx_path,
|
|
||||||
photos_path=photos_path,
|
|
||||||
interpolate_secs=args.interpolate_secs,
|
|
||||||
time_offset=args.time_offset,
|
|
||||||
recursive=args.recursive,
|
|
||||||
write_mode=args.write,
|
|
||||||
dry_run=args.dry_run,
|
|
||||||
skip_existing_gps=skip_existing_gps,
|
|
||||||
)
|
|
||||||
|
|
||||||
rc = run_exiftool(cmd, verbose=args.verbose)
|
|
||||||
if rc != 0:
|
|
||||||
print(f"[ERROR] exiftool zakończył się kodem {rc}.", file=sys.stderr)
|
|
||||||
return rc
|
|
||||||
|
|
||||||
if args.dry_run:
|
|
||||||
print("[OK] Dry-run zakończony. Usuń --dry-run, aby zapisać EXIF.")
|
|
||||||
else:
|
|
||||||
if skip_existing_gps:
|
|
||||||
print("[OK] Geotagowanie zakończone. (Pliki z istniejącym GPS zostały pominięte.)")
|
|
||||||
else:
|
|
||||||
print("[OK] Geotagowanie zakończone. (Istniejący GPS mógł zostać nadpisany.)")
|
|
||||||
|
|
||||||
if temp_dir:
|
|
||||||
print(f"[INFO] Tymczasowy GPX: {gpx_path} (możesz go zachować do archiwum).")
|
|
||||||
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
sys.exit(main())
|
|
||||||
|
|
||||||
|
|
@ -1,334 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
immich_geotag_from_owntracks.py
|
|
||||||
|
|
||||||
Geotaguj zdjęcia na podstawie śladu z OwnTracks (JSON/JSONL/NDJSON/.rec) lub pliku GPX
|
|
||||||
z wykorzystaniem exiftool. Idealne do późniejszego importu / reindeksu w Immich.
|
|
||||||
|
|
||||||
Nowości:
|
|
||||||
- Obsługa JSON Lines / NDJSON oraz „sklejonych” obiektów JSON.
|
|
||||||
- Domyślne przeszukiwanie katalogu: *.json, *.jsonl, *.ndjson, *.rec
|
|
||||||
- **Bezpiecznik:** domyślnie NIE nadpisuje GPS, jeśli już istnieje w EXIF.
|
|
||||||
(użyj --overwrite-existing-gps aby nadpisać)
|
|
||||||
- **--json-max-age-secs:** jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − max_age).
|
|
||||||
|
|
||||||
Wymagania:
|
|
||||||
- Python 3.9+
|
|
||||||
- exiftool (musi być w PATH, np. `apt-get install exiftool`)
|
|
||||||
|
|
||||||
Przykłady:
|
|
||||||
1) Katalog OwnTracks Recorder: owntracks/store/rec (pliki .rec):
|
|
||||||
python3 immich_geotag_from_owntracks.py \
|
|
||||||
--photos /ścieżka/do/zdjęć \
|
|
||||||
--json /ścieżka/do/owntracks/store/rec \
|
|
||||||
--recursive \
|
|
||||||
--write inplace
|
|
||||||
|
|
||||||
2) JSON/JSONL/NDJSON -> GPX -> geotag (bez nadpisywania istniejącego GPS):
|
|
||||||
python3 immich_geotag_from_owntracks.py \
|
|
||||||
--photos /ścieżka/do/zdjęć \
|
|
||||||
--json /ścieżka/do/logów \
|
|
||||||
--interpolate-secs 180 \
|
|
||||||
--time-offset "+00:00" \
|
|
||||||
--json-max-age-secs 86400 \
|
|
||||||
--write inplace
|
|
||||||
|
|
||||||
3) Wymuś nadpisywanie istniejącego GPS:
|
|
||||||
python3 immich_geotag_from_owntracks.py \
|
|
||||||
--photos /ścieżka/do/zdjęć \
|
|
||||||
--json /ścieżka/do/logów \
|
|
||||||
--overwrite-existing-gps \
|
|
||||||
--write inplace
|
|
||||||
"""
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import json
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
from datetime import datetime, timezone, timedelta
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import List, Dict, Any, Optional, Iterable
|
|
||||||
|
|
||||||
def parse_args() -> argparse.Namespace:
|
|
||||||
p = argparse.ArgumentParser(description="Geotaguj zdjęcia na podstawie OwnTracks JSON/JSONL/NDJSON/.rec lub GPX (dla Immich).")
|
|
||||||
g_input = p.add_argument_group("Wejście śladu")
|
|
||||||
g_input.add_argument("--gpx", type=str, help="Ścieżka do pliku GPX (opcjonalnie zamiast --json).")
|
|
||||||
g_input.add_argument("--json", type=str, help="Ścieżka do pliku/katalogu OwnTracks (JSON/JSONL/NDJSON/.rec).")
|
|
||||||
g_input.add_argument(
|
|
||||||
"--json-glob",
|
|
||||||
type=str,
|
|
||||||
default="*.json,*.jsonl,*.ndjson,*.rec",
|
|
||||||
help="Wzorce plików rozdzielone przecinkami gdy --json wskazuje katalog (domyślnie: *.json,*.jsonl,*.ndjson,*.rec)."
|
|
||||||
)
|
|
||||||
|
|
||||||
g_match = p.add_argument_group("Parametry dopasowania czasu")
|
|
||||||
g_match.add_argument("--interpolate-secs", type=int, default=180, help="Maksymalne okno interpolacji (GeoMaxIntSecs) w sekundach (domyślnie 180).")
|
|
||||||
g_match.add_argument("--time-offset", type=str, default="+00:00",
|
|
||||||
help="Korekta czasu EXIF względem GPX, np. '+00:00', '+01:00', '-00:30'. Przekazywana do exiftool jako -geosync.")
|
|
||||||
g_match.add_argument("--json-max-age-secs", type=int, default=0,
|
|
||||||
help="Jeśli > 0, odrzuca punkty OwnTracks starsze niż (teraz − ta liczba sekund). 0 lub mniej — wyłączone.")
|
|
||||||
|
|
||||||
g_photos = p.add_argument_group("Zdjęcia")
|
|
||||||
g_photos.add_argument("--photos", type=str, required=True, help="Katalog lub plik ze zdjęciem(i).")
|
|
||||||
g_photos.add_argument("--recursive", action="store_true", help="Przetwarzaj rekursywnie katalog zdjęć (-r dla exiftool).")
|
|
||||||
|
|
||||||
g_out = p.add_argument_group("Zapis")
|
|
||||||
g_out.add_argument("--write", choices=["inplace", "backup"], default="inplace",
|
|
||||||
help="Tryb zapisu: 'inplace' (overwrite_original) lub 'backup' (domyślny tryb exiftool z plikami *_original).")
|
|
||||||
g_out.add_argument("--dry-run", action="store_true", help="Nie zapisuj EXIF, tylko pokaż co byłoby wykonane.")
|
|
||||||
|
|
||||||
g_misc = p.add_argument_group("Różne")
|
|
||||||
g_misc.add_argument("--tmpdir", type=str, help="Katalog tymczasowy na plik GPX (jeśli generowany z JSON).")
|
|
||||||
g_misc.add_argument("--verbose", action="store_true", help="Więcej logów.")
|
|
||||||
g_misc.add_argument("--overwrite-existing-gps", action="store_true",
|
|
||||||
help="Domyślnie nie nadpisuje istniejącego GPS w EXIF. Dodaj ten przełącznik, aby nadpisać.")
|
|
||||||
|
|
||||||
return p.parse_args()
|
|
||||||
|
|
||||||
def _glob_patterns(directory: Path, patterns_csv: str) -> List[Path]:
|
|
||||||
patterns = [p.strip() for p in patterns_csv.split(",") if p.strip()]
|
|
||||||
files: List[Path] = []
|
|
||||||
for patt in patterns:
|
|
||||||
files.extend(directory.rglob(patt))
|
|
||||||
# deduplikacja i sort
|
|
||||||
return sorted(set(files))
|
|
||||||
|
|
||||||
def discover_json_files(path: Path, patterns_csv: str) -> List[Path]:
|
|
||||||
if path.is_file():
|
|
||||||
return [path]
|
|
||||||
elif path.is_dir():
|
|
||||||
return _glob_patterns(path, patterns_csv)
|
|
||||||
else:
|
|
||||||
raise FileNotFoundError(f"Ścieżka z danymi OwnTracks nie istnieje: {path}")
|
|
||||||
|
|
||||||
def _iter_json_objects_from_string(buf: str) -> Iterable[Any]:
|
|
||||||
"""Próbuje: klasyczny JSON -> JSONL/NDJSON linia po linii -> sekwencyjne raw_decode."""
|
|
||||||
# 1) klasyczny JSON
|
|
||||||
try:
|
|
||||||
obj = json.loads(buf)
|
|
||||||
if isinstance(obj, list):
|
|
||||||
for x in obj:
|
|
||||||
yield x
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
yield obj
|
|
||||||
return
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
# 2) JSON Lines / NDJSON
|
|
||||||
parsed_any = False
|
|
||||||
for line in buf.splitlines():
|
|
||||||
s = line.strip()
|
|
||||||
if not s or s.startswith("#"):
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
yield json.loads(s)
|
|
||||||
parsed_any = True
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
if parsed_any:
|
|
||||||
return
|
|
||||||
# 3) Wiele sklejonych obiektów JSON
|
|
||||||
dec = json.JSONDecoder()
|
|
||||||
i = 0
|
|
||||||
n = len(buf)
|
|
||||||
while i < n:
|
|
||||||
while i < n and buf[i].isspace():
|
|
||||||
i += 1
|
|
||||||
if i >= n:
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
obj, j = dec.raw_decode(buf, idx=i)
|
|
||||||
yield obj
|
|
||||||
i = j
|
|
||||||
except Exception:
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
def load_owntracks_points(json_files: List[Path], verbose=False) -> List[Dict[str, Any]]:
|
|
||||||
"""Czyta punkty OwnTracks z wielu wariantów plików. Wymaga lat/lon/tst."""
|
|
||||||
points: List[Dict[str, Any]] = []
|
|
||||||
for f in json_files:
|
|
||||||
try:
|
|
||||||
raw = f.read_text(encoding="utf-8")
|
|
||||||
except Exception as e:
|
|
||||||
if verbose:
|
|
||||||
print(f"[WARN] Nie udało się odczytać {f}: {e}", file=sys.stderr)
|
|
||||||
continue
|
|
||||||
any_in_file = False
|
|
||||||
for entry in _iter_json_objects_from_string(raw):
|
|
||||||
try:
|
|
||||||
lat = float(entry["lat"])
|
|
||||||
lon = float(entry["lon"])
|
|
||||||
tst = int(entry["tst"]) # epoch sekundy
|
|
||||||
if tst <= 0:
|
|
||||||
continue
|
|
||||||
dt = datetime.fromtimestamp(tst, tz=timezone.utc)
|
|
||||||
points.append({"lat": lat, "lon": lon, "time": dt})
|
|
||||||
any_in_file = True
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
if verbose and not any_in_file:
|
|
||||||
print(f"[WARN] Brak poprawnych rekordów lat/lon/tst w pliku: {f}", file=sys.stderr)
|
|
||||||
points.sort(key=lambda x: x["time"])
|
|
||||||
if verbose:
|
|
||||||
if points:
|
|
||||||
print(f"[INFO] Załadowano {len(points)} punktów OwnTracks. Zakres: {points[0]['time']} .. {points[-1]['time']} (UTC)")
|
|
||||||
else:
|
|
||||||
print("[WARN] Nie znaleziono żadnych punktów OwnTracks (lat/lon/tst).")
|
|
||||||
return points
|
|
||||||
|
|
||||||
def write_gpx(points: List[Dict[str, Any]], out_path: Path, verbose=False) -> None:
|
|
||||||
def iso8601(dt: datetime) -> str:
|
|
||||||
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
||||||
with open(out_path, "w", encoding="utf-8") as gpx:
|
|
||||||
gpx.write('<?xml version="1.0" encoding="UTF-8"?>\n')
|
|
||||||
gpx.write('<gpx version="1.1" creator="immich_geotag_from_owntracks.py" xmlns="http://www.topografix.com/GPX/1/1">\n')
|
|
||||||
gpx.write(' <trk>\n')
|
|
||||||
gpx.write(' <name>OwnTracks Export</name>\n')
|
|
||||||
gpx.write(' <trkseg>\n')
|
|
||||||
for pt in points:
|
|
||||||
gpx.write(f' <trkpt lat="{pt["lat"]:.8f}" lon="{pt["lon"]:.8f}"><time>{iso8601(pt["time"])}</time></trkpt>\n')
|
|
||||||
gpx.write(' </trkseg>\n')
|
|
||||||
gpx.write(' </trk>\n')
|
|
||||||
gpx.write('</gpx>\n')
|
|
||||||
if verbose:
|
|
||||||
print(f"[INFO] Zapisano GPX: {out_path} ({len(points)} punktów)")
|
|
||||||
|
|
||||||
def ensure_exiftool_available() -> None:
|
|
||||||
try:
|
|
||||||
subprocess.run(["exiftool", "-ver"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
||||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
||||||
print("[ERROR] exiftool nie jest dostępny w PATH. Zainstaluj exiftool i spróbuj ponownie.", file=sys.stderr)
|
|
||||||
sys.exit(2)
|
|
||||||
|
|
||||||
def build_exiftool_cmd(
|
|
||||||
gpx_path: Path,
|
|
||||||
photos_path: Path,
|
|
||||||
interpolate_secs: int,
|
|
||||||
time_offset: str,
|
|
||||||
recursive: bool,
|
|
||||||
write_mode: str,
|
|
||||||
dry_run: bool,
|
|
||||||
skip_existing_gps: bool,
|
|
||||||
) -> List[str]:
|
|
||||||
cmd: List[str] = ["exiftool"]
|
|
||||||
if recursive and photos_path.is_dir():
|
|
||||||
cmd.append("-r")
|
|
||||||
|
|
||||||
# Pomijaj pliki, które już mają GPS
|
|
||||||
if skip_existing_gps:
|
|
||||||
cmd.extend(["-if", "not $GPSLatitude"])
|
|
||||||
|
|
||||||
# dopasowanie w oknie czasowym
|
|
||||||
cmd.extend(["-api", f"GeoMaxIntSecs={interpolate_secs}"])
|
|
||||||
|
|
||||||
# korekta czasu
|
|
||||||
if time_offset:
|
|
||||||
cmd.append(f"-geosync={time_offset}")
|
|
||||||
|
|
||||||
# źródło geotagów
|
|
||||||
cmd.extend(["-geotag", str(gpx_path)])
|
|
||||||
|
|
||||||
# zapis
|
|
||||||
if write_mode == "inplace" and not dry_run:
|
|
||||||
cmd.append("-overwrite_original")
|
|
||||||
|
|
||||||
# cel (ostatni argument)
|
|
||||||
cmd.append(str(photos_path))
|
|
||||||
|
|
||||||
# symulacja
|
|
||||||
if dry_run:
|
|
||||||
cmd.insert(1, "-n") # wartości numeryczne bez formatowania
|
|
||||||
cmd.append("-S") # krótsze wyjście
|
|
||||||
cmd.append("-v2") # szczegóły operacji
|
|
||||||
|
|
||||||
return cmd
|
|
||||||
|
|
||||||
def run_exiftool(cmd: List[str], verbose=False) -> int:
|
|
||||||
if verbose:
|
|
||||||
print("[INFO] Uruchamiam:", " ".join(cmd))
|
|
||||||
try:
|
|
||||||
proc = subprocess.run(cmd, check=False)
|
|
||||||
return proc.returncode
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("[ERROR] exiftool nie znaleziony.", file=sys.stderr)
|
|
||||||
return 127
|
|
||||||
|
|
||||||
def main() -> int:
|
|
||||||
args = parse_args()
|
|
||||||
ensure_exiftool_available()
|
|
||||||
|
|
||||||
photos_path = Path(args.photos)
|
|
||||||
if not photos_path.exists():
|
|
||||||
print(f"[ERROR] Ścieżka do zdjęć nie istnieje: {photos_path}", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
# 1) Przygotuj GPX
|
|
||||||
gpx_path: Optional[Path] = None
|
|
||||||
temp_dir: Optional[Path] = None
|
|
||||||
|
|
||||||
if args.gpx:
|
|
||||||
gpx_path = Path(args.gpx)
|
|
||||||
if not gpx_path.exists():
|
|
||||||
print(f"[ERROR] Plik GPX nie istnieje: {gpx_path}", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
if args.verbose:
|
|
||||||
print(f"[INFO] Używam dostarczonego GPX: {gpx_path}")
|
|
||||||
else:
|
|
||||||
if not args.json:
|
|
||||||
print("[ERROR] Musisz podać --gpx lub --json.", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
json_path = Path(args.json)
|
|
||||||
try:
|
|
||||||
files = discover_json_files(json_path, args.json_glob)
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
print(f"[ERROR] {e}", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
if args.verbose:
|
|
||||||
print(f"[INFO] Znaleziono {len(files)} plików danych (wzorce: {args.json_glob}).")
|
|
||||||
|
|
||||||
points = load_owntracks_points(files, verbose=args.verbose)
|
|
||||||
if not points:
|
|
||||||
print("[ERROR] Brak punktów z OwnTracks po wczytaniu plików (lat/lon/tst nie znaleziono).", file=sys.stderr)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
temp_dir = Path(args.tmpdir) if args.tmpdir else Path(tempfile.mkdtemp(prefix="owntracks_gpx_"))
|
|
||||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
gpx_path = temp_dir / "owntracks_export.gpx"
|
|
||||||
write_gpx(points, gpx_path, verbose=args.verbose)
|
|
||||||
|
|
||||||
assert gpx_path is not None
|
|
||||||
|
|
||||||
# 2) Zbuduj i uruchom exiftool
|
|
||||||
skip_existing_gps = not args.overwrite_existing_gps
|
|
||||||
cmd = build_exiftool_cmd(
|
|
||||||
gpx_path=gpx_path,
|
|
||||||
photos_path=photos_path,
|
|
||||||
interpolate_secs=args.interpolate_secs,
|
|
||||||
time_offset=args.time_offset,
|
|
||||||
recursive=args.recursive,
|
|
||||||
write_mode=args.write,
|
|
||||||
dry_run=args.dry_run,
|
|
||||||
skip_existing_gps=skip_existing_gps,
|
|
||||||
)
|
|
||||||
|
|
||||||
rc = run_exiftool(cmd, verbose=args.verbose)
|
|
||||||
if rc != 0:
|
|
||||||
print(f"[ERROR] exiftool zakończył się kodem {rc}.", file=sys.stderr)
|
|
||||||
return rc
|
|
||||||
|
|
||||||
if args.dry_run:
|
|
||||||
print("[OK] Dry-run zakończony. Wygląda dobrze. Usuń --dry-run, aby zapisać EXIF.")
|
|
||||||
else:
|
|
||||||
print("[OK] Geotagowanie zakończone sukcesem. (Pliki z istniejącym GPS zostały pominięte.)")
|
|
||||||
|
|
||||||
if temp_dir:
|
|
||||||
print(f"[INFO] Tymczasowy GPX: {gpx_path} (możesz go zachować do archiwum).")
|
|
||||||
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
sys.exit(main())
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
immich_geotag_from_owntracks.py
|
immich_geotag_from_owntracks_wrong.py
|
||||||
|
|
||||||
Geotaguj zdjęcia na podstawie śladu z OwnTracks (JSON) lub pliku GPX
|
Geotaguj zdjęcia na podstawie śladu z OwnTracks (JSON) lub pliku GPX
|
||||||
z wykorzystaniem exiftool. Idealne do późniejszego importu / reindeksu w Immich.
|
z wykorzystaniem exiftool. Idealne do późniejszego importu / reindeksu w Immich.
|
||||||
|
|
@ -12,7 +12,7 @@ Wymagania:
|
||||||
|
|
||||||
Przykłady:
|
Przykłady:
|
||||||
1) JSON -> GPX -> geotag (rekurencyjnie po katalogu ze zdjęciami):
|
1) JSON -> GPX -> geotag (rekurencyjnie po katalogu ze zdjęciami):
|
||||||
python3 immich_geotag_from_owntracks.py \
|
python3 immich_geotag_from_owntracks_wrong.py \
|
||||||
--photos /ścieżka/do/zdjęć \
|
--photos /ścieżka/do/zdjęć \
|
||||||
--json /ścieżka/do/owntracks_json \
|
--json /ścieżka/do/owntracks_json \
|
||||||
--json-max-age-secs 10 \
|
--json-max-age-secs 10 \
|
||||||
|
|
@ -21,7 +21,7 @@ Przykłady:
|
||||||
--write inplace
|
--write inplace
|
||||||
|
|
||||||
2) GPX -> geotag:
|
2) GPX -> geotag:
|
||||||
python3 immich_geotag_from_owntracks.py \
|
python3 immich_geotag_from_owntracks_wrong.py \
|
||||||
--photos /ścieżka/do/zdjęć \
|
--photos /ścieżka/do/zdjęć \
|
||||||
--gpx /ścieżka/track.gpx \
|
--gpx /ścieżka/track.gpx \
|
||||||
--interpolate-secs 180 \
|
--interpolate-secs 180 \
|
||||||
|
|
@ -126,7 +126,7 @@ def write_gpx(points: List[Dict[str, Any]], out_path: Path, verbose=False) -> No
|
||||||
|
|
||||||
with open(out_path, "w", encoding="utf-8") as gpx:
|
with open(out_path, "w", encoding="utf-8") as gpx:
|
||||||
gpx.write('<?xml version="1.0" encoding="UTF-8"?>\n')
|
gpx.write('<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||||
gpx.write('<gpx version="1.1" creator="immich_geotag_from_owntracks.py" xmlns="http://www.topografix.com/GPX/1/1">\n')
|
gpx.write('<gpx version="1.1" creator="immich_geotag_from_owntracks_wrong.py" xmlns="http://www.topografix.com/GPX/1/1">\n')
|
||||||
gpx.write(' <trk>\n')
|
gpx.write(' <trk>\n')
|
||||||
gpx.write(' <name>OwnTracks Export</name>\n')
|
gpx.write(' <name>OwnTracks Export</name>\n')
|
||||||
gpx.write(' <trkseg>\n')
|
gpx.write(' <trkseg>\n')
|
||||||
Loading…
Reference in a new issue