Compare commits
No commits in common. "72c5a53610ad9fd6f3156ca20a14aea6c235fe98" and "95a976e930bf7c09c6aa2710a702411363de6e4e" have entirely different histories.
72c5a53610
...
95a976e930
|
|
@ -1,96 +0,0 @@
|
||||||
# Homelab Event System
|
|
||||||
|
|
||||||
The homelab multi-agent platform uses a filesystem-first event architecture for observability, auditability, and agent reasoning.
|
|
||||||
|
|
||||||
## Architecture
|
|
||||||
|
|
||||||
Events are stored as individual JSON files on the local filesystem. This ensures that the system is resilient to network outages and requires no external dependencies like databases or message brokers.
|
|
||||||
|
|
||||||
### Filesystem Layout
|
|
||||||
|
|
||||||
Events are organized by date and node:
|
|
||||||
|
|
||||||
```
|
|
||||||
/opt/homelab/events/YYYY-MM-DD/node-name/TIMESTAMP_TYPE_UUID.json
|
|
||||||
```
|
|
||||||
|
|
||||||
- **Date-based partitioning** allows for easy archival and rotation.
|
|
||||||
- **Node-based partitioning** supports multi-node environments and offline synchronization.
|
|
||||||
- **Append-only** nature ensures an immutable audit trail.
|
|
||||||
|
|
||||||
## Event Schema
|
|
||||||
|
|
||||||
Each event is a JSON object with the following fields:
|
|
||||||
|
|
||||||
| Field | Type | Description |
|
|
||||||
|------------------|--------|-------------------------------------------------------|
|
|
||||||
| `timestamp` | string | ISO 8601 UTC timestamp |
|
|
||||||
| `node` | string | Hostname of the node where the event originated |
|
|
||||||
| `type` | string | Normalized event type |
|
|
||||||
| `severity` | string | `info`, `warning`, `error`, `critical` |
|
|
||||||
| `source` | string | Component that emitted the event (e.g., `deploy.sh`) |
|
|
||||||
| `service` | string | Service name or `all` |
|
|
||||||
| `correlation_id` | string | Used to link related events (e.g., deployment run ID) |
|
|
||||||
| `payload` | object | Arbitrary event-specific data |
|
|
||||||
|
|
||||||
### Normalized Event Types
|
|
||||||
|
|
||||||
- `deployment_started`: A deployment process has begun.
|
|
||||||
- `deployment_completed`: A deployment finished successfully.
|
|
||||||
- `deployment_failed`: A deployment failed at some stage.
|
|
||||||
- `service_unhealthy`: A healthcheck failed for a service.
|
|
||||||
- `service_recovered`: A service returned to healthy state.
|
|
||||||
- `node_offline`: Node detected it is losing connectivity (heartbeat loss).
|
|
||||||
- `node_online`: Node detected it is back online.
|
|
||||||
- `healthcheck_failed`: Generic healthcheck failure.
|
|
||||||
- `remediation_started`: An automated or manual fix is being applied.
|
|
||||||
- `remediation_completed`: Remediation finished.
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
### Shell Library
|
|
||||||
|
|
||||||
Source `scripts/lib/events.sh` to use the event library in bash scripts.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
source scripts/lib/events.sh
|
|
||||||
|
|
||||||
# Emit an event
|
|
||||||
emit_event "deployment_started" "info" "my-script.sh" "mosquitto" "unique-cid" '{"version": "1.0"}'
|
|
||||||
|
|
||||||
# List events for today
|
|
||||||
list_events
|
|
||||||
```
|
|
||||||
|
|
||||||
### Python Library
|
|
||||||
|
|
||||||
Import `scripts.lib.events` in Python scripts.
|
|
||||||
|
|
||||||
```python
|
|
||||||
from scripts.lib.events import emit_event
|
|
||||||
|
|
||||||
emit_event(
|
|
||||||
event_type="service_unhealthy",
|
|
||||||
severity="error",
|
|
||||||
source="monitor.py",
|
|
||||||
service="ollama",
|
|
||||||
correlation_id="12345",
|
|
||||||
payload={"error": "OOM"}
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Operator & AI Agent Reasoning
|
|
||||||
|
|
||||||
The event system is designed to support future AI agents:
|
|
||||||
|
|
||||||
1. **Causal Chains**: By using `correlation_id`, agents can trace a failure back to a specific deployment or remediation attempt.
|
|
||||||
2. **Resumable Remediation**: Agents can check the latest `remediation_started` events to see what has already been tried.
|
|
||||||
3. **Auditability**: Every action taken by an operator or agent leaves a permanent record on the filesystem.
|
|
||||||
4. **Offline Capability**: Events are stored locally and can be synced when connectivity is restored.
|
|
||||||
|
|
||||||
## Example Flow: Deployment Failure & Recovery
|
|
||||||
|
|
||||||
1. **Event 1**: `deployment_started` (Type: deployment, CID: `deploy-882`)
|
|
||||||
2. **Event 2**: `deployment_failed` (Type: deployment, CID: `deploy-882`, Payload: `{"stage": "verify", "error": "port 1883 not bound"}`)
|
|
||||||
3. **Event 3**: `remediation_started` (Source: `diagnostics.sh`, CID: `deploy-882`)
|
|
||||||
4. **Event 4**: `service_recovered` (Source: `healthcheck.sh`, Service: `mosquitto`, CID: `deploy-882`)
|
|
||||||
|
|
@ -70,8 +70,6 @@ stage_prepare() {
|
||||||
log "INFO" "Stage: PREPARE ($host)"
|
log "INFO" "Stage: PREPARE ($host)"
|
||||||
set_stage "prepare"
|
set_stage "prepare"
|
||||||
|
|
||||||
emit_event "deployment_started" "info" "deploy.sh" "all" "${TIMESTAMP}" "{\"stage\": \"prepare\"}"
|
|
||||||
|
|
||||||
cd "$REPO_PATH" || exit 1
|
cd "$REPO_PATH" || exit 1
|
||||||
log "INFO" "Pulling latest changes..."
|
log "INFO" "Pulling latest changes..."
|
||||||
if ! git pull; then
|
if ! git pull; then
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,6 @@ collect_diagnostics() {
|
||||||
local service=$2
|
local service=$2
|
||||||
log "INFO" "Stage: DIAGNOSE ($host - ${service:-all})"
|
log "INFO" "Stage: DIAGNOSE ($host - ${service:-all})"
|
||||||
|
|
||||||
if [[ -n "$service" ]]; then
|
|
||||||
emit_event "remediation_started" "warning" "diagnostics.sh" "$service" "${TIMESTAMP}" "{\"reason\": \"failure_detected\"}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
local diag_file="${LOG_DIR}/diagnostics_${TIMESTAMP}.txt"
|
local diag_file="${LOG_DIR}/diagnostics_${TIMESTAMP}.txt"
|
||||||
{
|
{
|
||||||
echo "--- DIAGNOSTICS FOR ${service:-all} (Host: $host, Time: $(date)) ---"
|
echo "--- DIAGNOSTICS FOR ${service:-all} (Host: $host, Time: $(date)) ---"
|
||||||
|
|
|
||||||
|
|
@ -1,85 +0,0 @@
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import datetime
|
|
||||||
import uuid
|
|
||||||
import socket
|
|
||||||
|
|
||||||
EVENTS_BASE_DIR = os.getenv("RUNTIME_PATH", "/opt/homelab") + "/events"
|
|
||||||
|
|
||||||
def emit_event(event_type, severity, source, service, correlation_id, payload=None):
|
|
||||||
"""
|
|
||||||
Emits a normalized JSON event to the filesystem.
|
|
||||||
"""
|
|
||||||
if payload is None:
|
|
||||||
payload = {}
|
|
||||||
|
|
||||||
node = socket.gethostname()
|
|
||||||
now = datetime.datetime.now(datetime.timezone.utc)
|
|
||||||
timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
||||||
date_dir = now.strftime("%Y-%m-%d")
|
|
||||||
|
|
||||||
event_dir = os.path.join(EVENTS_BASE_DIR, date_dir, node)
|
|
||||||
os.makedirs(event_dir, exist_ok=True)
|
|
||||||
|
|
||||||
event_id = str(uuid.uuid4())
|
|
||||||
filename = f"{timestamp}_{event_type}_{event_id}.json"
|
|
||||||
event_path = os.path.join(event_dir, filename)
|
|
||||||
|
|
||||||
event_data = {
|
|
||||||
"timestamp": timestamp,
|
|
||||||
"node": node,
|
|
||||||
"type": event_type,
|
|
||||||
"severity": severity,
|
|
||||||
"source": source,
|
|
||||||
"service": service,
|
|
||||||
"correlation_id": correlation_id,
|
|
||||||
"payload": payload
|
|
||||||
}
|
|
||||||
|
|
||||||
with open(event_path, "w") as f:
|
|
||||||
json.dump(event_data, f, indent=2)
|
|
||||||
|
|
||||||
return event_path
|
|
||||||
|
|
||||||
def list_events(date_str=None, node=None):
|
|
||||||
"""
|
|
||||||
Lists paths to event files for a specific date and/or node.
|
|
||||||
"""
|
|
||||||
if date_str is None:
|
|
||||||
date_str = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
|
|
||||||
|
|
||||||
search_path = os.path.join(EVENTS_BASE_DIR, date_str)
|
|
||||||
if node:
|
|
||||||
search_path = os.path.join(search_path, node)
|
|
||||||
|
|
||||||
if not os.path.exists(search_path):
|
|
||||||
return []
|
|
||||||
|
|
||||||
event_files = []
|
|
||||||
for root, dirs, files in os.walk(search_path):
|
|
||||||
for file in files:
|
|
||||||
if file.endswith(".json"):
|
|
||||||
event_files.append(os.path.join(root, file))
|
|
||||||
|
|
||||||
return sorted(event_files)
|
|
||||||
|
|
||||||
def get_event(event_path):
|
|
||||||
"""
|
|
||||||
Reads and parses an event file.
|
|
||||||
"""
|
|
||||||
with open(event_path, "r") as f:
|
|
||||||
return json.load(f)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Simple CLI for emitting events from Python
|
|
||||||
import sys
|
|
||||||
if len(sys.argv) > 1 and sys.argv[1] == "emit":
|
|
||||||
# emit <type> <severity> <source> <service> <cid> [payload_json]
|
|
||||||
etype = sys.argv[2]
|
|
||||||
sev = sys.argv[3]
|
|
||||||
src = sys.argv[4]
|
|
||||||
svc = sys.argv[5]
|
|
||||||
cid = sys.argv[6]
|
|
||||||
payload = json.loads(sys.argv[7]) if len(sys.argv) > 7 else {}
|
|
||||||
path = emit_event(etype, sev, src, svc, cid, payload)
|
|
||||||
print(f"Event emitted: {path}")
|
|
||||||
|
|
@ -1,79 +0,0 @@
|
||||||
#!/usr/bin/env bash
|
|
||||||
# events.sh - Filesystem-first event system for homelab
|
|
||||||
|
|
||||||
EVENTS_BASE_DIR="${RUNTIME_PATH:-/opt/homelab}/events"
|
|
||||||
|
|
||||||
# Emit a normalized JSON event
|
|
||||||
# Usage: emit_event <type> <severity> <source> <service> <correlation_id> <payload_json>
|
|
||||||
emit_event() {
|
|
||||||
local type=$1
|
|
||||||
local severity=$2
|
|
||||||
local source=$3
|
|
||||||
local service=$4
|
|
||||||
local correlation_id=$5
|
|
||||||
local payload=${6:-"{}"}
|
|
||||||
|
|
||||||
local node=$(hostname)
|
|
||||||
local timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
|
|
||||||
local date_dir=$(date +"%Y-%m-%d")
|
|
||||||
|
|
||||||
local event_dir="${EVENTS_BASE_DIR}/${date_dir}/${node}"
|
|
||||||
mkdir -p "$event_dir"
|
|
||||||
|
|
||||||
# Generate a unique filename for the event to ensure append-only/no-overwrite
|
|
||||||
local event_id=$(cat /proc/sys/kernel/random/uuid 2>/dev/null || date +%s%N)
|
|
||||||
local event_file="${event_dir}/${timestamp}_${type}_${event_id}.json"
|
|
||||||
|
|
||||||
# Construct JSON
|
|
||||||
cat <<EOF > "$event_file"
|
|
||||||
{
|
|
||||||
"timestamp": "$timestamp",
|
|
||||||
"node": "$node",
|
|
||||||
"type": "$type",
|
|
||||||
"severity": "$severity",
|
|
||||||
"source": "$source",
|
|
||||||
"service": "$service",
|
|
||||||
"correlation_id": "$correlation_id",
|
|
||||||
"payload": $payload
|
|
||||||
}
|
|
||||||
EOF
|
|
||||||
|
|
||||||
# Also log to standard logging if available
|
|
||||||
if command -v log >/dev/null 2>&1; then
|
|
||||||
log "EVENT" "[$type] service=$service severity=$severity cid=$correlation_id"
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
# Query recent events (last N events or by date)
|
|
||||||
# Usage: list_events [date] [node]
|
|
||||||
list_events() {
|
|
||||||
local target_date=${1:-$(date +"%Y-%m-%d")}
|
|
||||||
local target_node=$2
|
|
||||||
|
|
||||||
local search_path="${EVENTS_BASE_DIR}/${target_date}"
|
|
||||||
if [[ -n "$target_node" ]]; then
|
|
||||||
search_path="${search_path}/${target_node}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [[ -d "$search_path" ]]; then
|
|
||||||
find "$search_path" -name "*.json" | sort
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
# Simple filter helper
|
|
||||||
# Usage: filter_events <field> <value>
|
|
||||||
filter_events() {
|
|
||||||
local field=$1
|
|
||||||
local value=$2
|
|
||||||
local files=$3
|
|
||||||
|
|
||||||
for f in $files; do
|
|
||||||
if grep -q "\"$field\": \"$value\"" "$f"; then
|
|
||||||
echo "$f"
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
}
|
|
||||||
|
|
||||||
# export -f emit_event
|
|
||||||
# export -f list_events
|
|
||||||
# export -f filter_events
|
|
||||||
|
|
@ -8,11 +8,6 @@ log() {
|
||||||
echo "[$(date +'%Y-%m-%d %H:%M:%S')] [$level] $message"
|
echo "[$(date +'%Y-%m-%d %H:%M:%S')] [$level] $message"
|
||||||
}
|
}
|
||||||
|
|
||||||
# --- Load Events Library ---
|
|
||||||
if [[ -f "${LIB_PATH:-$(dirname "${BASH_SOURCE[0]}")}/events.sh" ]]; then
|
|
||||||
source "${LIB_PATH:-$(dirname "${BASH_SOURCE[0]}")}/events.sh"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Structured log for machine reading
|
# Structured log for machine reading
|
||||||
# timestamp, stage, host, service, command_result, info
|
# timestamp, stage, host, service, command_result, info
|
||||||
struct_log() {
|
struct_log() {
|
||||||
|
|
@ -22,34 +17,7 @@ struct_log() {
|
||||||
local result=$4
|
local result=$4
|
||||||
local info=$5
|
local info=$5
|
||||||
log "STRUCT" "stage=$stage host=$host service=$service result=$result info=\"$info\""
|
log "STRUCT" "stage=$stage host=$host service=$service result=$result info=\"$info\""
|
||||||
|
|
||||||
# Emit event if it matches normalized types
|
|
||||||
local event_type=""
|
|
||||||
local severity="info"
|
|
||||||
|
|
||||||
case "$stage" in
|
|
||||||
"deploy")
|
|
||||||
if [[ "$result" == "success" ]]; then
|
|
||||||
event_type="deployment_completed"
|
|
||||||
elif [[ "$result" == "fail" ]]; then
|
|
||||||
event_type="deployment_failed"
|
|
||||||
severity="error"
|
|
||||||
else
|
|
||||||
event_type="deployment_started"
|
|
||||||
fi
|
|
||||||
;;
|
|
||||||
"validate")
|
|
||||||
if [[ "$result" == "fail" ]]; then
|
|
||||||
event_type="deployment_failed"
|
|
||||||
severity="error"
|
|
||||||
fi
|
|
||||||
;;
|
|
||||||
esac
|
|
||||||
|
|
||||||
if [[ -n "$event_type" ]] && command -v emit_event >/dev/null 2>&1; then
|
|
||||||
emit_event "$event_type" "$severity" "deploy.sh" "$service" "${TIMESTAMP:-$(date +%s)}" "{\"stage\": \"$stage\", \"info\": \"$info\"}"
|
|
||||||
fi
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# export -f log
|
export -f log
|
||||||
# export -f struct_log
|
export -f struct_log
|
||||||
|
|
|
||||||
|
|
@ -4,27 +4,14 @@
|
||||||
# Check if the container is running
|
# Check if the container is running
|
||||||
if ! docker ps --filter "name=mosquitto" --filter "status=running" | grep -q "mosquitto"; then
|
if ! docker ps --filter "name=mosquitto" --filter "status=running" | grep -q "mosquitto"; then
|
||||||
echo "[FAIL] Mosquitto container is not running"
|
echo "[FAIL] Mosquitto container is not running"
|
||||||
if command -v emit_event >/dev/null 2>&1; then
|
|
||||||
emit_event "service_unhealthy" "error" "healthcheck.sh" "mosquitto" "$(date +%s)" "{\"reason\": \"container_not_running\"}"
|
|
||||||
fi
|
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Basic port check for 1883
|
# Basic port check for 1883
|
||||||
if ! (echo > /dev/tcp/localhost/1883) >/dev/null 2>&1; then
|
if ! (echo > /dev/tcp/localhost/1883) >/dev/null 2>&1; then
|
||||||
echo "[FAIL] Mosquitto port 1883 is not reachable"
|
echo "[FAIL] Mosquitto port 1883 is not reachable"
|
||||||
if command -v emit_event >/dev/null 2>&1; then
|
|
||||||
emit_event "service_unhealthy" "error" "healthcheck.sh" "mosquitto" "$(date +%s)" "{\"reason\": \"port_unreachable\", \"port\": 1883}"
|
|
||||||
fi
|
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo "[OK] Mosquitto is healthy"
|
echo "[OK] Mosquitto is healthy"
|
||||||
if command -v emit_event >/dev/null 2>&1; then
|
|
||||||
# Optional: could emit service_recovered if it was previously unhealthy
|
|
||||||
# For now, just a generic healthcheck_success or similar if needed,
|
|
||||||
# but the requirements mentioned service_recovered.
|
|
||||||
# Logic for recovery usually requires state.
|
|
||||||
emit_event "service_recovered" "info" "healthcheck.sh" "mosquitto" "$(date +%s)" "{\"status\": \"healthy\"}"
|
|
||||||
fi
|
|
||||||
exit 0
|
exit 0
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue