Architecture
NavFlow is built as a set of loosely coupled services connected by NATS JetStream. Each service has a single responsibility, and the platform separates the control plane (management) from the data plane (event ingestion and output).
Services
Control Plane
The control plane is a Go service that handles:
- User authentication (JWT-based signup/login)
- Project management (CRUD, API keys, members)
- Pipeline configuration (filter expressions, transforms, agent endpoint)
- Sink configuration (webhook URLs, Slack webhooks)
- Internal APIs consumed by other services to fetch project configs
It stores all state in PostgreSQL and connects to NATS for service coordination. The NavFlow dashboard talks exclusively to the control plane API.
Receiver (Data Plane)
The receiver is the data ingestion service. It exposes three groups of HTTP endpoints:
- OTLP ingest (
/v1/logs,/v1/traces,/v1/metrics) — accepts OpenTelemetry data from your applications. Validates theX-API-Keyheader, unpacks OTLP batches into flat JSON records, and publishes each record to the project’s NATS raw stream. - HTTP/JSON events (
/v1/events,/v1/events/batch) — accepts arbitrary JSON objects from any source (webhooks, custom applications, third-party integrations). Supports optionalX-Event-TypeandX-Sourceheaders for metadata injection. Events are published directly to the project’s NATS raw stream. - Agent output (
/internal/agent-output) — accepts enriched results from AI agents via the NavFlow SDK. Validates the API key and publishes the payload directly to the project’s NATS output stream.
The receiver is the only service that external clients talk to for data. The control plane never touches event payloads.
Pipeline
The pipeline service supports two modes:
Legacy mode (window_enabled: false):
- Trigger — consumes from the project’s raw NATS stream, applies an expr-lang trigger expression, and drops non-matching records.
- Transform — applies field transforms.
- Bridge — collects records into batches (default: 10), marshals them into a JSON array, and POSTs the batch to the agent endpoint.
Window mode (window_enabled: true):
- Transform — applies field transforms.
- Trigger check — evaluates the trigger expression. If matched, reads the current sliding window from Redis and forwards
{trigger, window}to the agent. - Context filter — if the event passes the context filter, stores it in the Redis window for future triggers.
Events expire from the window naturally via Redis TTL (configurable duration, default 300 seconds).
A pipeline is only started for projects that have an agent endpoint URL configured. Projects without an agent are not consumed from.
Sink
The sink service consumes from the project’s NATS output stream (where agent results land) and dispatches each message to configured destinations:
- Webhook — HTTP POST to a URL with configurable method and headers
- Slack — Posts to a Slack incoming webhook URL
AI Agent (your service)
Agents are standalone services that you build and host. NavFlow calls your agent’s HTTP endpoint when events match your trigger rules. An agent:
- Receives events via HTTP POST from the pipeline (JSON array in batch mode, or trigger + window object in window mode)
- Processes them (classification, enrichment, summarization, anomaly detection, etc.)
- Sends results back to the receiver via the NavFlow Python SDK
See Agents for details on building agents.
Data flow
Your Application / Webhook Source
│
│ OTLP HTTP (/v1/logs) or JSON (/v1/events)
│ X-API-Key header
▼
┌─────────────────────┐
│ Receiver │ ← Data Plane
│ (port 4318) │
└────────┬────────────┘
│ Flat JSON records
▼
NATS Raw Stream
(nf-ai-raw-{projectID})
│
▼
┌─────────────────────┐
│ Pipeline │
│ ┌───────────────┐ │
│ │ Legacy mode: │ │
│ │ 1. Trigger │ │ ← Drop non-matching records
│ │ 2. Transform │ │ ← Reshape fields
│ │ 3. Batch │ │ ← Collect into JSON array
│ │ 4. Forward │ │ ← HTTP POST to agent
│ │ │ │
│ │ Window mode: │ │
│ │ 1. Transform │ │ ← Reshape fields
│ │ 2. Trigger │ │ ← Check expression, read Redis window
│ │ 3. Forward │ │ ← POST {trigger, window} to agent
│ │ 4. Store │ │ ← Write to Redis window (if context filter matches)
│ └───────────────┘ │
└────────┬────────────┘
│ JSON array batch
▼
┌─────────────────────┐
│ AI Agent │ ← Your service
│ (e.g. port 8000) │
└────────┬────────────┘
│ NavFlow SDK
│ POST /internal/agent-output
▼
┌─────────────────────┐
│ Receiver │ ← Agent output ingest
└────────┬────────────┘
│
▼
NATS Output Stream
(nf-ai-output-{projectID})
│
▼
┌─────────────────────┐
│ Sink │
│ → Webhook │
│ → Slack │
└─────────────────────┘NATS streams
Each project gets three dedicated JetStream streams:
| Stream | Subject | Producer | Consumer |
|---|---|---|---|
nf-ai-raw-{id} | nf-ai-raw.{id}.in | Receiver | Pipeline |
nf-ai-output-{id} | nf-ai-output.{id}.out | Receiver (agent output) | Sink |
All streams use durable consumers with explicit ACK policy, ensuring no data is lost.
There is no intermediate “filtered” stream. In legacy mode, the pipeline processes records in-memory: raw stream → trigger → transform → batch → HTTP forward. In window mode, Redis serves as an intermediate store for the sliding context window, but NATS still has no filtered stream.
Event formats
OTLP logs
The receiver unpacks OTLP log batches into individual flat JSON records:
{
"resourceAttributes": {
"service.name": "my-service",
"k8s.pod.name": "my-pod-abc123",
"k8s.namespace.name": "production"
},
"scope": {
"name": "my-library"
},
"severityNumber": 17,
"severityText": "ERROR",
"body": "Connection refused: database pool exhausted",
"attributes": {
"error.type": "ConnectionError"
},
"traceId": "abc123...",
"spanId": "def456...",
"timestamp": "2024-01-15T10:30:00Z"
}HTTP/JSON events
Events sent via /v1/events are stored as-is — any valid JSON object works:
{
"type": "order_failed",
"user_id": "u123",
"amount": 99.99,
"error": "payment_declined",
"_event_type": "order",
"_source": "checkout-service"
}The _event_type and _source fields are injected from optional X-Event-Type and X-Source headers.
Trigger expressions, transforms, and context window filters operate on whatever fields are present in the event — they work the same way regardless of the source.
Technology choices
| Component | Technology | Why |
|---|---|---|
| Backend | Go + Huma framework | Low overhead, fast startup, single binary per service |
| Frontend | Next.js 15 + React 19 | App Router, server components, standalone mode |
| Messaging | NATS JetStream | Durable streaming, lightweight, per-project isolation |
| Database | PostgreSQL | Reliable, widely supported, handles auth + config |
| Window Store | Redis (redis-stack-server) | Native TTL, sorted sets for temporal ordering, shared across replicas |
| AI Agents | Python + OpenAI Agents SDK | Rich AI ecosystem, easy to build custom agents |
| SDK | Python (requests) | Simple HTTP client, no complex dependencies |