Agents
AI agents are HTTP services that receive events from NavFlow, process them, and send enriched results back. You build and host the agent — NavFlow handles the data delivery, filtering, context windows, and output routing.
How agents work
- NavFlow’s pipeline filters and transforms incoming events, then POSTs matching events to your agent’s endpoint.
- Your agent processes the events (classification, enrichment, summarization, anomaly detection — whatever you build).
- Your agent sends results back to NavFlow via the Python SDK, which POSTs to the receiver’s
/internal/agent-outputendpoint. - NavFlow’s sink picks up the output and dispatches it to your configured webhooks or Slack channels.
Request format (batch mode)
NavFlow sends a POST request to your agent with:
Headers:
Content-Type: application/jsonX-Project-ID: <project-uuid>X-Request-ID: <unique-request-id>
Body: A JSON array of filtered, transformed records:
[
{
"resourceAttributes": {
"service.name": "payment-service",
"k8s.pod.name": "payment-abc123"
},
"severityNumber": 17,
"severityText": "ERROR",
"body": "Connection refused: database pool exhausted",
"attributes": {},
"traceId": "...",
"spanId": "...",
"timestamp": "2024-01-15T10:30:00Z"
},
{
"resourceAttributes": {
"service.name": "payment-service"
},
"severityNumber": 17,
"severityText": "ERROR",
"body": "Timeout waiting for DB connection after 30s",
"attributes": {},
"traceId": "...",
"spanId": "...",
"timestamp": "2024-01-15T10:30:01Z"
}
]Request format (context window mode)
When context windows are enabled, your agent receives the trigger event plus its sliding window of context instead of a flat array:
Body:
{
"trigger": {
"severityNumber": 17,
"severityText": "ERROR",
"body": "payment failed",
"resourceAttributes": {"service.name": "payment"}
},
"window": {
"key": "payment",
"events": [
{"data": {"severityText": "WARN", "body": "high latency"}, "timestamp": "..."},
{"data": {"severityText": "WARN", "body": "retry attempt 2"}, "timestamp": "..."}
],
"stats": {
"count": 2,
"duration_ms": 4500,
"first_at": "...",
"last_at": "..."
}
}
}The trigger is the event that fired the invocation. The window.events are the preceding context events (oldest first). The stats object provides summary metadata about the window.
Use different agent endpoints for each mode: /process for batch mode, /process-window for context window mode. Set the appropriate endpoint URL in your project’s pipeline configuration.
Sending results back
Install the NavFlow Python SDK:
pip install navflowUse it to send output back to NavFlow:
from navflow import NavFlow
nf = NavFlow(
api_key="your-project-api-key",
endpoint="https://receiver.navflow.ai",
)
nf.send_output(
payload={
"summary": "Database connection pool exhausted",
"severity": "critical",
"root_cause": "Too many concurrent connections",
"remediation": "Increase max_connections or add connection pooling",
},
request_id=request_id, # from X-Request-ID header
)Building an agent
A minimal agent is a FastAPI server with a /process endpoint:
from fastapi import FastAPI, Header, Request
from navflow import NavFlow
app = FastAPI()
nf = NavFlow(
api_key="your-project-api-key",
endpoint="https://receiver.navflow.ai",
)
@app.post("/process")
async def process(
request: Request,
x_project_id: str = Header(default=""),
x_request_id: str = Header(default=""),
):
records = await request.json()
# Your processing logic here
output = {"processed": len(records), "results": [...]}
nf.send_output(payload=output, request_id=x_request_id)
return {"status": "ok"}
@app.get("/healthz")
async def healthz():
return {"status": "ok"}Deploy this anywhere that’s publicly reachable — a cloud VM, serverless function, container service, or any platform that can run an HTTP server.
Building a context window agent
For agents that receive context windows, add a /process-window endpoint:
@app.post("/process-window")
async def process_window(
request: Request,
x_request_id: str = Header(default=""),
):
payload = await request.json()
trigger = payload["trigger"] # the event that fired
window = payload["window"] # recent related events + stats
# Your temporal analysis logic
analysis = await analyze_with_context(trigger, window["events"], window["stats"])
nf.send_output(payload=analysis, request_id=x_request_id)
return {"status": "ok"}Set your project’s agent endpoint to https://your-agent.example.com/process-window to use context window mode.
Example: OTLP Error Summary
NavFlow provides a reference agent that uses the OpenAI Agents SDK to classify and enrich error logs. It demonstrates both batch and context window modes.
It runs two AI agents in sequence:
- Classifier — identifies the error type (e.g.
ConnectionPoolExhausted) and severity (critical,warning,informational) - Enricher — produces a summary, root cause hypothesis, remediation steps, and whether to notify
In context window mode, it uses a single WindowAnalyzer agent that analyzes the trigger event in the context of preceding events — identifying escalation patterns, correlations, and temporal relationships.
See the example agent source code for implementation details.
Connecting your agent to NavFlow
- Build and deploy your agent at a publicly accessible URL
- Create an API key in your project’s dashboard
- Set the agent endpoint in the project’s Pipeline configuration to your agent’s URL
- Configure the SDK in your agent with your API key and the NavFlow receiver endpoint (
https://receiver.navflow.ai)
Your agent endpoint must be publicly reachable by NavFlow. Make sure your hosting environment allows incoming HTTPS connections on the port your agent listens on.
Agent requirements
Your agent must:
- Accept
POSTrequests with a JSON body (array for batch mode, object for window mode) - Return a
2xxstatus code on success (NavFlow retries on5xx) - Send output back via the NavFlow SDK (or POST to the receiver’s
/internal/agent-outputendpoint with theX-API-Keyheader)
Your agent can be written in any language — the SDK is optional. You can POST results directly to the receiver’s /internal/agent-output endpoint.