Skip to Content
Architecture

Architecture

NavFlow is built as a set of loosely coupled services connected by NATS JetStream. Each service has a single responsibility, and the platform separates the control plane (management) from the data plane (event ingestion and output).

Services

Control Plane

The control plane is a Go service that handles:

  • User authentication (JWT-based signup/login)
  • Project management (CRUD, API keys, members)
  • Pipeline configuration (filter expressions, transforms, agent endpoint)
  • Sink configuration (webhook URLs, Slack webhooks)
  • Internal APIs consumed by other services to fetch project configs

It stores all state in PostgreSQL and connects to NATS for service coordination. The NavFlow dashboard talks exclusively to the control plane API.

Receiver (Data Plane)

The receiver is the data ingestion service. It exposes three groups of HTTP endpoints:

  • OTLP ingest (/v1/logs, /v1/traces, /v1/metrics) — accepts OpenTelemetry data from your applications. Validates the X-API-Key header, unpacks OTLP batches into flat JSON records, and publishes each record to the project’s NATS raw stream.
  • HTTP/JSON events (/v1/events, /v1/events/batch) — accepts arbitrary JSON objects from any source (webhooks, custom applications, third-party integrations). Supports optional X-Event-Type and X-Source headers for metadata injection. Events are published directly to the project’s NATS raw stream.
  • Agent output (/internal/agent-output) — accepts enriched results from AI agents via the NavFlow SDK. Validates the API key and publishes the payload directly to the project’s NATS output stream.

The receiver is the only service that external clients talk to for data. The control plane never touches event payloads.

Pipeline

The pipeline service supports two modes:

Legacy mode (window_enabled: false):

  1. Trigger — consumes from the project’s raw NATS stream, applies an expr-lang trigger expression, and drops non-matching records.
  2. Transform — applies field transforms.
  3. Bridge — collects records into batches (default: 10), marshals them into a JSON array, and POSTs the batch to the agent endpoint.

Window mode (window_enabled: true):

  1. Transform — applies field transforms.
  2. Trigger check — evaluates the trigger expression. If matched, reads the current sliding window from Redis and forwards {trigger, window} to the agent.
  3. Context filter — if the event passes the context filter, stores it in the Redis window for future triggers.

Events expire from the window naturally via Redis TTL (configurable duration, default 300 seconds).

A pipeline is only started for projects that have an agent endpoint URL configured. Projects without an agent are not consumed from.

Sink

The sink service consumes from the project’s NATS output stream (where agent results land) and dispatches each message to configured destinations:

  • Webhook — HTTP POST to a URL with configurable method and headers
  • Slack — Posts to a Slack incoming webhook URL

AI Agent (your service)

Agents are standalone services that you build and host. NavFlow calls your agent’s HTTP endpoint when events match your trigger rules. An agent:

  1. Receives events via HTTP POST from the pipeline (JSON array in batch mode, or trigger + window object in window mode)
  2. Processes them (classification, enrichment, summarization, anomaly detection, etc.)
  3. Sends results back to the receiver via the NavFlow Python SDK

See Agents for details on building agents.

Data flow

Your Application / Webhook Source │ OTLP HTTP (/v1/logs) or JSON (/v1/events) │ X-API-Key header ┌─────────────────────┐ │ Receiver │ ← Data Plane │ (port 4318) │ └────────┬────────────┘ │ Flat JSON records NATS Raw Stream (nf-ai-raw-{projectID}) ┌─────────────────────┐ │ Pipeline │ │ ┌───────────────┐ │ │ │ Legacy mode: │ │ │ │ 1. Trigger │ │ ← Drop non-matching records │ │ 2. Transform │ │ ← Reshape fields │ │ 3. Batch │ │ ← Collect into JSON array │ │ 4. Forward │ │ ← HTTP POST to agent │ │ │ │ │ │ Window mode: │ │ │ │ 1. Transform │ │ ← Reshape fields │ │ 2. Trigger │ │ ← Check expression, read Redis window │ │ 3. Forward │ │ ← POST {trigger, window} to agent │ │ 4. Store │ │ ← Write to Redis window (if context filter matches) │ └───────────────┘ │ └────────┬────────────┘ │ JSON array batch ┌─────────────────────┐ │ AI Agent │ ← Your service │ (e.g. port 8000) │ └────────┬────────────┘ │ NavFlow SDK │ POST /internal/agent-output ┌─────────────────────┐ │ Receiver │ ← Agent output ingest └────────┬────────────┘ NATS Output Stream (nf-ai-output-{projectID}) ┌─────────────────────┐ │ Sink │ │ → Webhook │ │ → Slack │ └─────────────────────┘

NATS streams

Each project gets three dedicated JetStream streams:

StreamSubjectProducerConsumer
nf-ai-raw-{id}nf-ai-raw.{id}.inReceiverPipeline
nf-ai-output-{id}nf-ai-output.{id}.outReceiver (agent output)Sink

All streams use durable consumers with explicit ACK policy, ensuring no data is lost.

There is no intermediate “filtered” stream. In legacy mode, the pipeline processes records in-memory: raw stream → trigger → transform → batch → HTTP forward. In window mode, Redis serves as an intermediate store for the sliding context window, but NATS still has no filtered stream.

Event formats

OTLP logs

The receiver unpacks OTLP log batches into individual flat JSON records:

{ "resourceAttributes": { "service.name": "my-service", "k8s.pod.name": "my-pod-abc123", "k8s.namespace.name": "production" }, "scope": { "name": "my-library" }, "severityNumber": 17, "severityText": "ERROR", "body": "Connection refused: database pool exhausted", "attributes": { "error.type": "ConnectionError" }, "traceId": "abc123...", "spanId": "def456...", "timestamp": "2024-01-15T10:30:00Z" }

HTTP/JSON events

Events sent via /v1/events are stored as-is — any valid JSON object works:

{ "type": "order_failed", "user_id": "u123", "amount": 99.99, "error": "payment_declined", "_event_type": "order", "_source": "checkout-service" }

The _event_type and _source fields are injected from optional X-Event-Type and X-Source headers.

Trigger expressions, transforms, and context window filters operate on whatever fields are present in the event — they work the same way regardless of the source.

Technology choices

ComponentTechnologyWhy
BackendGo + Huma frameworkLow overhead, fast startup, single binary per service
FrontendNext.js 15 + React 19App Router, server components, standalone mode
MessagingNATS JetStreamDurable streaming, lightweight, per-project isolation
DatabasePostgreSQLReliable, widely supported, handles auth + config
Window StoreRedis (redis-stack-server)Native TTL, sorted sets for temporal ordering, shared across replicas
AI AgentsPython + OpenAI Agents SDKRich AI ecosystem, easy to build custom agents
SDKPython (requests)Simple HTTP client, no complex dependencies
Last updated on