immich_owntracks/gpx2owntracks.py

501 lines
18 KiB
Python
Raw Permalink Normal View History

2025-08-28 17:07:42 +02:00
#!/usr/bin/env python3
"""
GPX OwnTracks (Recorder) importer
Converts .gpx tracks (trk/seg/pt) to OwnTracks Location JSON messages and either:
writes a JSONL file (one message per line), or
writes individual .json files in a Recorder-like folder structure, or
POSTs messages to an OwnTracks Recorder HTTP endpoint (/pub), optionally throttled.
Features
- Config via YAML + CLI flags that override config
- Robust error handling & validation
- Optional speed calculation (m/s) and altitude inclusion
- Time window filtering
- Dry-run mode
- Logging with levels
OwnTracks message fields used
_type: "location"
tst: Unix timestamp (UTC)
lat, lon, alt (optional), acc (accuracy meters), vel (optional m/s), tid (2-char), conn ("w")
Dependencies
pip install gpxpy pyyaml requests
Usage examples
# Basic: create JSONL from GPX
python gpx2owntracks.py input.gpx --jsonl-path out/track.jsonl --user alice --device pixel --tid AL
# Using YAML config (CLI overrides YAML)
python gpx2owntracks.py input.gpx --config config.yaml --mode http --rate 5
# Write Recorder-like file tree
python gpx2owntracks.py input.gpx --mode files --files-base-dir /var/lib/ot-recorder/store/recorder \
--user alice --device pixel
# Post directly to Recorder (HTTP /pub)
python gpx2owntracks.py input.gpx --mode http --http-url https://rec.example.com/pub \
--http-user user --http-pass pass --no-verify
Example config.yaml
-------------------
user: alice
device: pixel
Tid: AL # two characters (case-sensitive)
default_acc: 10
include_alt: true
calc_speed: true
assume_timezone: UTC # used only if GPX points lack timezone info
mode: jsonl # jsonl | files | http
jsonl_path: ./out/track.jsonl
files_base_dir: /var/lib/ot-recorder/store/recorder
http:
url: https://rec.example.com/pub
username: user
password: pass
tls_verify: true
rate_limit_per_sec: 5 # for http mode
start_time: null # ISO8601 filter (inclusive), e.g. "2025-08-01T00:00:00Z"
end_time: null # ISO8601 filter (exclusive)
log_level: INFO
Recorder file mode layout
<files_base_dir>/<user>/<device>/<YYYY>/<MM>/<DD>/<tst>.json
Note: When writing files, Recorder will pick them up if they are placed under its store path.
"""
from __future__ import annotations
import argparse
import dataclasses
import datetime as dt
import json
import logging
import math
import os
import sys
import time
from pathlib import Path
from typing import Iterable, List, Optional, Tuple
import gpxpy
import gpxpy.gpx
import yaml
# Optional imports for HTTP mode
try:
import requests # type: ignore
except Exception: # pragma: no cover
requests = None
ISO8601 = "%Y-%m-%dT%H:%M:%S%z"
@dataclasses.dataclass
class Config:
user: Optional[str] = None
device: Optional[str] = None
tid: Optional[str] = None
default_acc: float = 10.0
include_alt: bool = True
calc_speed: bool = False
assume_timezone: str = "UTC"
mode: str = "jsonl" # jsonl | files | http
jsonl_path: Optional[str] = None
files_base_dir: Optional[str] = None
http_url: Optional[str] = None
http_username: Optional[str] = None
http_password: Optional[str] = None
http_tls_verify: bool = True
rate_limit_per_sec: Optional[float] = None
start_time: Optional[str] = None # ISO8601
end_time: Optional[str] = None # ISO8601
log_level: str = "INFO"
class ConfigError(Exception):
pass
def load_config(path: Optional[str]) -> Config:
if not path:
return Config()
p = Path(path)
if not p.exists():
raise ConfigError(f"Config file not found: {p}")
try:
with p.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
except Exception as e:
raise ConfigError(f"Failed to parse YAML: {e}")
def get_bool(d, k, default):
v = d.get(k, default)
return bool(v)
http = data.get("http", {}) or {}
cfg = Config(
user=data.get("user"),
device=data.get("device"),
tid=data.get("Tid") or data.get("tid"),
default_acc=float(data.get("default_acc", 10.0)),
include_alt=get_bool(data, "include_alt", True),
calc_speed=get_bool(data, "calc_speed", False),
assume_timezone=str(data.get("assume_timezone", "UTC")),
mode=str(data.get("mode", "jsonl")),
jsonl_path=data.get("jsonl_path"),
files_base_dir=data.get("files_base_dir"),
http_url=http.get("url") or data.get("http_url"),
http_username=http.get("username") or data.get("http_username"),
http_password=http.get("password") or data.get("http_password"),
http_tls_verify=bool(http.get("tls_verify", data.get("http_tls_verify", True))),
rate_limit_per_sec=(float(data.get("rate_limit_per_sec")) if data.get("rate_limit_per_sec") is not None else None),
start_time=data.get("start_time"),
end_time=data.get("end_time"),
log_level=str(data.get("log_level", "INFO")).upper(),
)
return cfg
def merge_cli_over_config(cfg: Config, args: argparse.Namespace) -> Config:
# Helper to prefer CLI when provided
def override(val, cli_val):
return cli_val if cli_val is not None else val
cfg.user = override(cfg.user, args.user)
cfg.device = override(cfg.device, args.device)
cfg.tid = override(cfg.tid, args.tid)
cfg.default_acc = override(cfg.default_acc, args.acc)
if args.include_alt is not None:
cfg.include_alt = args.include_alt
if args.calc_speed is not None:
cfg.calc_speed = args.calc_speed
cfg.assume_timezone = override(cfg.assume_timezone, args.assume_timezone)
cfg.mode = override(cfg.mode, args.mode)
cfg.jsonl_path = override(cfg.jsonl_path, args.jsonl_path)
cfg.files_base_dir = override(cfg.files_base_dir, args.files_base_dir)
cfg.http_url = override(cfg.http_url, args.http_url)
cfg.http_username = override(cfg.http_username, args.http_user)
cfg.http_password = override(cfg.http_password, args.http_pass)
if args.verify is not None:
cfg.http_tls_verify = args.verify
cfg.rate_limit_per_sec = override(cfg.rate_limit_per_sec, args.rate)
cfg.start_time = override(cfg.start_time, args.start_time)
cfg.end_time = override(cfg.end_time, args.end_time)
cfg.log_level = override(cfg.log_level, args.log_level.upper() if args.log_level else None)
return cfg
class OwnTracksMessage(dict):
"""Typed helper for OwnTracks location message."""
@staticmethod
def from_point(point: gpxpy.gpx.GPXTrackPoint, *,
default_acc: float = 10.0,
include_alt: bool = True,
vel: Optional[float] = None,
tid: Optional[str] = None) -> "OwnTracksMessage":
if not point.time:
raise ValueError("GPX point missing timestamp")
# Ensure timezone-aware UTC
if point.time.tzinfo is None:
# assume input is UTC naive
t_utc = point.time.replace(tzinfo=dt.timezone.utc)
else:
t_utc = point.time.astimezone(dt.timezone.utc)
tst = int(t_utc.timestamp())
msg: OwnTracksMessage = OwnTracksMessage({
"_type": "location",
"tst": tst,
"lat": float(point.latitude),
"lon": float(point.longitude),
"acc": float(default_acc),
"conn": "w",
})
if include_alt and point.elevation is not None and not math.isnan(point.elevation):
msg["alt"] = float(point.elevation)
if vel is not None and not math.isnan(vel):
msg["vel"] = float(max(0.0, vel))
if tid:
if len(tid) != 2:
raise ValueError("tid must be exactly 2 characters, e.g. 'AL'")
msg["tid"] = tid
return msg
@dataclasses.dataclass
class OutputSink:
mode: str
jsonl_path: Optional[Path] = None
base_dir: Optional[Path] = None
http_url: Optional[str] = None
http_auth: Optional[Tuple[str, str]] = None
http_verify: bool = True
rate_limit_per_sec: Optional[float] = None
user: Optional[str] = None
device: Optional[str] = None
dry_run: bool = False
def open(self):
if self.mode == "jsonl" and self.jsonl_path and not self.dry_run:
self.jsonl_path.parent.mkdir(parents=True, exist_ok=True)
self._jsonl_file = self.jsonl_path.open("w", encoding="utf-8")
else:
self._jsonl_file = None
if self.mode == "http":
if requests is None:
raise ConfigError("requests not available; install it or use jsonl/files mode")
if not self.http_url:
raise ConfigError("HTTP mode requires http_url")
def close(self):
if hasattr(self, "_jsonl_file") and self._jsonl_file:
self._jsonl_file.close()
def _rate_limit(self):
if self.rate_limit_per_sec and self.rate_limit_per_sec > 0:
time.sleep(1.0 / self.rate_limit_per_sec)
def write(self, msg: OwnTracksMessage):
if self.mode == "jsonl":
line = json.dumps(msg, separators=(",", ":"))
if self.dry_run:
logging.debug(line)
else:
assert self._jsonl_file is not None
self._jsonl_file.write(line + "\n")
elif self.mode == "files":
if not (self.base_dir and self.user and self.device):
raise ConfigError("files mode requires files_base_dir, user, and device")
if "tst" not in msg:
raise ValueError("message missing tst")
t = dt.datetime.fromtimestamp(int(msg["tst"]), tz=dt.timezone.utc)
out_dir = self.base_dir / self.user / self.device / t.strftime("%Y/%m/%d")
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{int(msg['tst'])}.json"
if self.dry_run:
logging.info("Would write %s", out_path)
else:
with out_path.open("w", encoding="utf-8") as f:
json.dump(msg, f, separators=(",", ":"))
elif self.mode == "http":
if not self.http_url:
raise ConfigError("HTTP mode requires http_url")
if self.dry_run:
logging.info("Would POST to %s: %s", self.http_url, msg)
else:
try:
resp = requests.post(
self.http_url,
json=msg,
auth=self.http_auth,
timeout=15,
verify=self.http_verify,
headers={"Content-Type": "application/json"},
)
if resp.status_code >= 400:
raise RuntimeError(f"HTTP {resp.status_code}: {resp.text[:200]}")
except Exception as e:
logging.error("HTTP POST failed: %s", e)
raise
self._rate_limit()
else:
raise ConfigError(f"Unknown mode: {self.mode}")
def parse_time(s: Optional[str]) -> Optional[dt.datetime]:
if not s:
return None
try:
# Allow 'Z' suffix
if s.endswith("Z"):
s = s.replace("Z", "+00:00")
return dt.datetime.fromisoformat(s)
except Exception as e:
raise ConfigError(f"Invalid ISO8601 time '{s}': {e}")
def compute_speed_m_s(prev: gpxpy.gpx.GPXTrackPoint, cur: gpxpy.gpx.GPXTrackPoint) -> Optional[float]:
if not prev.time or not cur.time:
return None
t1 = prev.time
t2 = cur.time
if t1.tzinfo is None:
t1 = t1.replace(tzinfo=dt.timezone.utc)
if t2.tzinfo is None:
t2 = t2.replace(tzinfo=dt.timezone.utc)
dt_s = (t2 - t1).total_seconds()
if dt_s <= 0:
return 0.0
# Haversine distance in meters
R = 6371000.0
phi1 = math.radians(prev.latitude)
phi2 = math.radians(cur.latitude)
dphi = phi2 - phi1
dlambda = math.radians(cur.longitude - prev.longitude)
a = math.sin(dphi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
d_m = R * c
return d_m / dt_s
def iter_gpx_points(gpx: gpxpy.gpx.GPX, start: Optional[dt.datetime], end: Optional[dt.datetime]) -> Iterable[gpxpy.gpx.GPXTrackPoint]:
for trk in gpx.tracks:
for seg in trk.segments:
for pt in seg.points:
if start and pt.time and (pt.time if pt.time.tzinfo else pt.time.replace(tzinfo=dt.timezone.utc)) < start:
continue
if end and pt.time and (pt.time if pt.time.tzinfo else pt.time.replace(tzinfo=dt.timezone.utc)) >= end:
continue
yield pt
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Convert GPX track to OwnTracks messages and output to JSONL, files, or HTTP.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p.add_argument("gpx", help="Input .gpx file")
p.add_argument("--config", help="Path to YAML config")
p.add_argument("--user", help="OwnTracks user")
p.add_argument("--device", help="OwnTracks device")
p.add_argument("--tid", help="2-character tracker id (e.g. AL)")
p.add_argument("--acc", type=float, help="Default accuracy (meters)")
p.add_argument("--include-alt", dest="include_alt", action=argparse.BooleanOptionalAction, default=None,
help="Include altitude if available")
p.add_argument("--calc-speed", dest="calc_speed", action=argparse.BooleanOptionalAction, default=None,
help="Calculate speed between points (m/s)")
p.add_argument("--assume-timezone", help="Timezone to assume if GPX times are naive (e.g. UTC, Europe/Warsaw)")
p.add_argument("--mode", choices=["jsonl", "files", "http"], help="Output mode")
p.add_argument("--jsonl-path", help="Output JSONL path")
p.add_argument("--files-base-dir", help="Recorder store base dir for files mode")
p.add_argument("--http-url", help="Recorder /pub URL for HTTP mode")
p.add_argument("--http-user", help="HTTP basic auth username")
p.add_argument("--http-pass", help="HTTP basic auth password")
p.add_argument("--verify", dest="verify", action=argparse.BooleanOptionalAction, default=None,
help="Verify TLS certificate in HTTP mode")
p.add_argument("--rate", type=float, help="Max posts per second (HTTP mode)")
p.add_argument("--start-time", help="Filter: start time ISO8601 inclusive (e.g. 2025-08-01T00:00:00Z)")
p.add_argument("--end-time", help="Filter: end time ISO8601 exclusive (e.g. 2025-09-01T00:00:00Z)")
p.add_argument("--dry-run", action="store_true", help="Do not write/POST, just validate and log actions")
p.add_argument("--log-level", default=None, help="Logging level (DEBUG, INFO, WARNING, ERROR)")
return p
def main(argv: Optional[List[str]] = None) -> int:
args = build_arg_parser().parse_args(argv)
base_cfg = load_config(args.config)
cfg = merge_cli_over_config(base_cfg, args)
log_level = getattr(logging, cfg.log_level.upper(), logging.INFO)
logging.basicConfig(level=log_level, format="%(levelname)s %(message)s")
# Basic validations
in_path = Path(args.gpx)
if not in_path.exists():
logging.error("Input GPX not found: %s", in_path)
return 2
if cfg.mode not in {"jsonl", "files", "http"}:
logging.error("Invalid mode: %s", cfg.mode)
return 2
if cfg.mode == "jsonl" and not cfg.jsonl_path:
logging.error("jsonl mode requires --jsonl-path or jsonl_path in config")
return 2
if cfg.mode == "files" and (not cfg.files_base_dir or not cfg.user or not cfg.device):
logging.error("files mode requires --files-base-dir, --user, and --device")
return 2
if cfg.mode == "http" and not cfg.http_url:
logging.error("http mode requires --http-url or http.url in config")
return 2
if cfg.tid and len(cfg.tid) != 2:
logging.error("--tid must be exactly 2 characters")
return 2
# Parse time filters
start = parse_time(cfg.start_time)
end = parse_time(cfg.end_time)
# Load GPX
try:
with in_path.open("r", encoding="utf-8") as f:
gpx = gpxpy.parse(f)
except Exception as e:
logging.error("Failed to parse GPX: %s", e)
return 2
# Prepare sink
sink = OutputSink(
mode=cfg.mode,
jsonl_path=Path(cfg.jsonl_path) if cfg.jsonl_path else None,
base_dir=Path(cfg.files_base_dir) if cfg.files_base_dir else None,
http_url=cfg.http_url,
http_auth=(cfg.http_username, cfg.http_password) if cfg.http_username or cfg.http_password else None,
http_verify=cfg.http_tls_verify,
rate_limit_per_sec=cfg.rate_limit_per_sec,
user=cfg.user,
device=cfg.device,
dry_run=args.dry_run,
)
try:
sink.open()
except Exception as e:
logging.error("Failed to open output sink: %s", e)
return 2
# Iterate points and emit
count = 0
prev_pt: Optional[gpxpy.gpx.GPXTrackPoint] = None
try:
for pt in iter_gpx_points(gpx, start, end):
try:
vel = compute_speed_m_s(prev_pt, pt) if (cfg.calc_speed and prev_pt) else None
msg = OwnTracksMessage.from_point(
pt,
default_acc=cfg.default_acc,
include_alt=cfg.include_alt,
vel=vel,
tid=cfg.tid,
)
sink.write(msg)
count += 1
prev_pt = pt
except Exception as e:
logging.warning("Skipping point due to error: %s", e)
continue
finally:
sink.close()
if count == 0:
logging.warning("No points exported.")
else:
logging.info("Exported %d points via mode='%s'", count, cfg.mode)
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
sys.exit(130)