Building Convergence – A Journey from Network Observability to AI-Driven Automation Part 5a: AI-Driven Automation Agent
We’ve been building Convergence across four parts: nautobot as the source of truth, OpenTelemetry as the collection pipeline, VictoriaMetrics and Loki for storage, Grafana for visualization, and then a whole threat-intelligence layer in Part 4 that enriches the top blocked IPs with AbuseIPDB, GreyNoise, OTX, and IPInfo scores, all fed into a Claude Haiku narrative that tells you, in plain English, what’s hitting your firewall and why you should care.
Part 4 was satisfying. But it was still passive. You’d read a report, maybe manually add an IP to a pfBlockerNG list. The loop wasn’t closed.
Part 5 closes the loop. This is a two-part writeup because there’s too much to cover in one post. This first part covers the agent’s core: how the polling loop works, how IPs get filtered and deduplicated, how Claude builds a structured action proposal, how the rate limiter and block count tracking work, and why each piece is designed the way it is. Part 5b covers the other half: the Discord bot approval flow, pfSense execution paths, the executor’s baseline-verify-rollback cycle, and the GAIT audit trail.
Code is on the phase5-automation-agent branch.
Why this architecture
The obvious question when building any AI automation is: what happens when it’s wrong? Threat intelligence has false positives. Composite scores aren’t perfect. A CDN IP with a high abuse score might be shared infrastructure that half your household uses. Blocking it automatically, in the middle of the night, because a number crossed a threshold is a bad outcome.
So the agent has a two-tier threshold system:
- Score ≥ 80: qualifies for analysis and a Claude action proposal
- Score 80–94: goes to Discord for human approval before anything executes
- Score ≥ 95: auto-executes — if something scores that high, it’s almost certainly a scanner or C2 node, not shared infra
The human approval window exists specifically for the ambiguous middle. The auto-execute threshold is set high enough that false positives there are genuinely rare. And underneath all of it, DRY_RUN=true is the default, the agent logs everything and simulates execution without touching pfSense until you’ve verified the flow and are ready to go live.
Service structure
The agent lives in services/automation-agent/. It’s a FastAPI app with APScheduler for the polling loop and discord.py for the bot.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
app/
├── main.py # FastAPI entrypoint, /health, /approve endpoint, GAIT REST API
├── config.py # Pydantic settings — all env vars in one place
├── scheduler.py # APScheduler: poll → analyze → gate → execute
├── state.py # In-memory pending approvals dict
├── metrics.py # Prometheus counters and gauges
├── actions/
│ ├── pfblocker.py # pfSense execution: REST API → XML-RPC → SSH waterfall
│ ├── executor.py # execute → verify → rollback → outcome
│ ├── baseline.py # VictoriaMetrics pre/post snapshot queries
│ └── rate_limiter.py # Sliding-window hourly cap, per-IP dedup, block count
├── analysis/
│ └── claude_action.py # Prompt builder + Claude API call
├── notifications/
│ ├── discord.py # Webhook: approval embeds + outcome messages
│ └── discord_bot.py # Bot: /approve /reject /approve-all /reject-all /pending
└── audit/
└── git_trail.py # GAIT — GitAuditTrail + AuditSession (covered in Part 5b)
The polling loop
APScheduler runs a background thread that fires every POLL_INTERVAL_SECONDS (default 600, so every 10 minutes). The first interesting problem is that APScheduler jobs are synchronous, the function it calls can’t be a co-routine, but everything else in the app is async: httpx for HTTP calls, redis.asyncio for Redis, the Anthropic SDK. Running asyncio.run() inside a job would spin up a second event loop and break everything.
The correct bridge is asyncio.run_coroutine_threadsafe(). It submits a co-routine to an already-running event loop from outside that loop, exactly what we need here:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# scheduler.py
_loop: asyncio.AbstractEventLoop | None = None
def _job_wrapper() -> None:
"""Synchronous bridge: APScheduler background thread → async poll_cycle."""
if _loop is None:
logger.error("Event loop not captured; skipping poll")
return
future = asyncio.run_coroutine_threadsafe(poll_cycle(), _loop)
try:
future.result(timeout=max(settings.poll_interval_seconds - 30, 60))
except Exception:
logger.exception("Automation poll job raised at the top level")
def start_scheduler(loop: asyncio.AbstractEventLoop) -> None:
global _scheduler, _loop
_loop = loop # captured from uvicorn's event loop during FastAPI startup
_scheduler = BackgroundScheduler(daemon=True)
_scheduler.add_job(
_job_wrapper,
"interval",
seconds=settings.poll_interval_seconds,
id="automation_poll",
max_instances=1, # prevents overlap if a poll runs long
coalesce=True, # skip missed runs instead of stacking them
)
_scheduler.start()
The max_instances=1 / coalesce=True combination matters: if a poll cycle takes longer than the interval (say, Claude is slow or pfSense is unresponsive), APScheduler won’t stack a second job on top of the first. It just skips the missed fire and waits for the next scheduled time.
The event loop reference is captured during FastAPI startup via the lifespan context manager, which runs on uvicorn’s main async loop:
1
2
3
4
5
6
7
8
# main.py
@asynccontextmanager
async def lifespan(_app: FastAPI):
start_scheduler(asyncio.get_running_loop()) # passes the running loop to the scheduler
await start_bot()
yield
await stop_bot()
Fetching and filtering high-risk IPs
Each poll cycle calls fetch_high_risk_ips(), which hits three endpoints on the threat-intel service. Two are flat lists (fast, good for filtering), one is the full report (slower but has the complete enrichment context that Claude needs):
1
2
3
4
5
6
# scheduler.py — fetch_high_risk_ips()
async with httpx.AsyncClient(timeout=20.0) as client:
blocked_resp = await client.get(f"{settings.threat_intel_url}/api/infinity/blocked_ips")
outbound_resp = await client.get(f"{settings.threat_intel_url}/api/infinity/outbound_suspicious")
full_resp = await client.get(f"{settings.threat_intel_url}/api/report")
The flat endpoints return a trimmed list, IP, score, is_known_bad_actor, country, org. The full report has the nested intel dict with all enrichment fields. We cross-reference them: filter from the flat list, pull the full context from the report. If the full report is unavailable (timeout, service down), we reconstruct a minimal intel dict from the flat fields so the flow can still run.
An IP qualifies if all three of these are true:
composite_score >= AUTO_ACTION_THRESHOLD(default 80)is_known_bad_actor == true— must be flagged by at least one threat feedlikely_false_positive == false— explicit false-positive heuristic from the threat-intel service (CDN ranges, legit crawlers, etc.)
Outbound suspicious IPs get a slightly higher bar: max(AUTO_ACTION_THRESHOLD, 85). The logic is conservative, blocking an inbound scanner hurts nothing if you’re wrong, but blocking an outbound destination affects every user on the network trying to reach that host.
The _build_entry() helper applies all of this and returns a clean dict or None:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def _build_entry(flat: dict, direction: str, full_lookup: dict) -> dict | None:
ip = flat.get("ip", "")
score = flat.get("score", 0)
is_bad = flat.get("is_known_bad_actor", False)
if not ip or score < settings.auto_action_threshold or not is_bad:
return None
full_entry = full_lookup.get(ip, {})
intel = full_entry.get("intel", {})
# Reconstruct intel from flat fields if full report wasn't available
if not intel:
intel = {
"composite_score": score,
"is_known_bad_actor": is_bad,
"org": flat.get("org", ""),
"country": flat.get("country", ""),
"abuse_confidence_score": flat.get("abuse_score", 0),
"threat_level": flat.get("threat_level", "unknown"),
"pulse_count": flat.get("otx_pulses", 0),
"gn_classification": flat.get("greynoise", "unknown"),
"likely_false_positive": False,
"riot": False,
}
if intel.get("likely_false_positive"):
logger.debug("Skipping likely FP: %s", ip)
return None
return {
"ip": ip,
"score": score,
"direction": direction,
"count": flat.get("events", 0),
"intel": intel,
"narrative": narrative_text, # executive summary from the full report
}
Deduplication: two layers
The first real operational problem after going live was the bot spamming Discord. Every 10-minute poll would re-evaluate all pending IPs, ones already sitting in the approval queue, and fire new alerts for each of them. After a few cycles with 20+ pending approvals, the channel was unreadable.
The fix is two-layered because a single layer has a gap.
Layer 1 — Redis TTL. When an IP enters the approval queue (or gets skipped for any reason), mark_ip_processed() sets a Redis key with a 4-hour TTL. Subsequent polls see the key and return immediately:
1
2
3
4
5
6
7
8
9
10
11
# rate_limiter.py
_PROC_PREFIX = "automation:processed:"
async def is_ip_already_processed(ip: str) -> bool:
r = get_redis()
return bool(await r.exists(f"{_PROC_PREFIX}{ip}"))
async def mark_ip_processed(ip: str, ttl_hours: int = 4) -> None:
r = get_redis()
await r.set(f"{_PROC_PREFIX}{ip}", "1", ex=ttl_hours * 3600)
This works for the normal case but not after a container restart, Redis keys are gone and the first poll will re-alert on everything.
Layer 2 — In-memory check. Before even hitting Redis, the scheduler checks state.pending_approvals, which is a plain Python dict that lives in memory for the lifetime of the container:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# scheduler.py — process_ip()
# Layer 1: Redis dedup (survives normal operation)
if await is_ip_already_processed(ip):
logger.debug("IP %s already processed recently; skipping", ip)
return
# Layer 2: In-memory dedup (catches the post-restart gap before Redis is repopulated)
already_pending = any(
data.get("ip") == ip for data in state.pending_approvals.values()
)
if already_pending:
logger.debug("IP %s already awaiting approval in memory; skipping", ip)
return
After a restart, the first poll fires with empty Redis. Any IPs still in state.pending_approvals get caught by Layer 2. For anything else, the approval flow runs again, which means one more Discord notification, unavoidably. But after that first post-restart poll, both layers are populated and stable.
The key detail that was originally missing: mark_ip_processed() must be called on every exit path, auto-execute, dry-run, no-action, and needs-approval. The original code only had it on the first three. Every time a poll cycle hit the needs_approval branch, it would skip the mark and the next poll would re-evaluate the same IP.
Claude’s action proposal
The prompt is the heart of the analysis. The goal is not just “is this IP bad?”, it’s “given everything we know about this IP and this network, what specific pfSense action should we take, and for how long?” That’s a more constrained, more useful question.
claude_action.py has two functions. build_action_prompt() assembles the prompt string (and is exported separately so the scheduler can record the exact text to GAIT before calling the model). propose_action() calls the API and parses the response.
The prompt structure is worth looking at in full because the design decisions matter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# claude_action.py — build_action_prompt() (abbreviated)
return f"""You are a network security automation agent for a home/SOHO pfSense firewall.
Your task: propose ONE safe, reversible pfSense blocking action for a high-risk IP.
THREAT DATA:
IP: {ip}
Direction: {threat_data.get("direction")} (inbound=WAN, outbound=LAN→internet)
Events (1h): {volume_label}
Block history: {repeat_label}
Intel: {json.dumps(prompt_intel, indent=4)}
THREAT NARRATIVE (excerpt from threat-intel service):
{narrative_excerpt}
PRE-ACTION BASELINE METRICS (from VictoriaMetrics):
{baseline_metrics}
SAFETY RULES (hard constraints — always apply):
1. If likely_false_positive is true → MUST output type: "no_action"
2. If composite_score < {settings.auto_action_threshold} → MUST output type: "no_action"
3. Never block RFC 1918 private IPs (10.x, 172.16-31.x, 192.168.x)
4. Never block known CDN/infrastructure orgs (Cloudflare, Akamai, Fastly,
Google, Apple, Microsoft) UNLESS abuse_confidence_score > 80
5. For outbound suspicious: only propose a block if composite_score > 85
AND abuse_confidence_score > 60
DURATION GUIDELINES (escalate based on history):
- First sighting, high score: 24 hours
- Borderline score: 12 hours
- Persistent bad actor (pulses > 5, abuse > 70): 72 hours
- Blocked {settings.repeat_offender_threshold}+ times OR {settings.high_volume_threshold}+ events/hour:
→ Use 168 hours (7 days) AND include "recommend_permanent_block": true in notes
Respond ONLY with valid JSON (no markdown, no code fences):
{{
"type": "pfblocker_add" | "no_action",
"target_list": "{_PFBLOCKER_LIST}",
"value": "x.x.x.x/32",
"reason": "concise reason citing specific intel",
"duration_hours": 24,
"confidence": "high" | "medium" | "low",
"notes": "any caveats or recommended follow-up steps"
}}"""
A few deliberate choices here:
The baseline metrics (current VictoriaMetrics traffic counters for this IP) are included so Claude can distinguish between an IP that’s actively hammering the firewall right now vs. one that has a bad reputation but hasn’t been seen recently. That’s a meaningful difference for duration and urgency.
The safety rules are written as hard constraints, not soft guidance. “MUST output type: no_action” for false positives leaves no room for Claude to weigh the constraint against other factors. For a network automation agent this is important, you want the model to treat these as absolute stops, not as preferences to balance.
The duration guidelines escalate based on history so that repeat offenders get longer blocks without requiring a separate escalation flow. Claude is doing the escalation as part of the proposal.
How propose_action() handles the response
The API call itself is straightforward, but the response handling has a few guards that are easy to miss:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# claude_action.py — propose_action()
message = await client.messages.create(
model=_MODEL, # claude-haiku-4-5-20251001
max_tokens=_MAX_TOKENS,
messages=[{"role": "user", "content": prompt}],
)
raw_text = message.content[0].text.strip()
if not raw_text:
return {"type": "no_action", "reason": "empty_response", "confidence": "none"}
# Strip optional markdown code fences — models sometimes add them despite instructions
if raw_text.startswith("```"):
lines = raw_text.splitlines()
end = -1 if lines[-1].strip() == "```" else len(lines)
raw_text = "\n".join(lines[1:end]).strip()
parsed = json.loads(raw_text)
parsed["model"] = _MODEL
parsed["prompt_tokens"] = message.usage.input_tokens
parsed["completion_tokens"] = message.usage.output_tokens
return parsed
The code fence strip is necessary. Even with an explicit “no markdown” instruction, Haiku occasionally wraps JSON in ` json ... . The fence-stripping logic handles this gracefully rather than crashing on a JSONDecodeError`.
Token counts are added to the parsed dict and committed to GAIT. This makes it possible to see, after the fact, exactly how much context the model received and what it cost.
Any failure, network error, non-JSON response, empty reply, returns {"type": "no_action"} rather than propagating an exception. This is a safety default: a broken AI call should never accidentally trigger an action.
Repeat offender tracking
The original agent had no memory of whether it had blocked an IP before. It could evaluate and block the same IP every 24 hours indefinitely, treating each occurrence as a fresh first sighting.
Adding block count tracking required changes across four files. The data model is simple, a Redis counter per IP with a one-year TTL:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# rate_limiter.py
_BLOCK_COUNT_PREFIX = "automation:block_count:"
_BLOCK_COUNT_TTL = 365 * 24 * 3600 # 1 year — long enough to track persistent threats
async def get_block_count(ip: str) -> int:
"""Return the lifetime number of times this IP has been blocked."""
r = get_redis()
val = await r.get(f"{_BLOCK_COUNT_PREFIX}{ip}")
return int(val) if val else 0
async def increment_block_count(ip: str) -> int:
"""Increment the lifetime block count and return the new total."""
r = get_redis()
key = f"{_BLOCK_COUNT_PREFIX}{ip}"
count = await r.incr(key)
await r.expire(key, _BLOCK_COUNT_TTL)
return int(count)
INCR in Redis is atomic, no race condition on concurrent increments. The one-year TTL means the counter doesn’t accumulate forever for IPs that were briefly malicious and then stopped; anything truly gone for a year effectively resets.
The scheduler fetches the block count and adds it to threat_data before Claude sees the prompt:
1
2
3
4
# scheduler.py — process_ip()
# Enrich threat_data with block history before Claude analysis
threat_data["block_count"] = await get_block_count(ip)
This lets the prompt’s repeat_label and volume_label variables give Claude clear, contextual signals:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# claude_action.py — build_action_prompt()
block_count = threat_data.get("block_count", 0)
hourly_events = threat_data.get("count", 0)
if block_count >= settings.repeat_offender_threshold: # default: 5
repeat_label = (
f"⚠️ REPEAT OFFENDER — blocked {block_count} time(s) previously. "
f"Consider recommending permanent block list addition."
)
elif block_count > 0:
repeat_label = f"Previously blocked {block_count} time(s)."
else:
repeat_label = "First time seen by this agent."
if hourly_events >= settings.high_volume_threshold: # default: 50
volume_label = (
f"⚠️ HIGH VOLUME, {hourly_events} events in the last hour. "
f"Actively hammering the network. Consider recommending permanent block."
)
else:
volume_label = f"{hourly_events} events in the last hour."
The thresholds, 5 lifetime blocks or 50 events in the last hour, are configurable in .env. The reasoning: an IP blocked 5 separate times has demonstrated persistent malicious behavior that warrants a different response than a first-time scanner. An IP generating 50 events per hour is actively hammering the network right now, regardless of block history. Either signal independently justifies escalation to a 168-hour block and a recommend_permanent_block note.
After a successful live block, increment_block_count() fires in executor.py and the new total is included in the Discord outcome message:
1
2
3
4
5
6
7
8
9
10
11
# executor.py — after successful execution
total_blocks = await increment_block_count(ip)
if total_blocks >= settings.repeat_offender_threshold:
repeat_note = (
f" — ⚠️ **{total_blocks}x blocked total** (repeat offender — "
f"consider adding to a permanent block list)"
)
elif total_blocks > 1:
repeat_note = f" — blocked {total_blocks}x total"
The block history also appears in the Discord approval embed, so when you’re reviewing a pending proposal you can see at a glance whether this is a first-time visitor or someone who’s been back four times already.
Rate limiting: the sliding window
The hourly action cap (MAX_ACTIONS_PER_HOUR, default 5) uses a Redis sorted set with timestamps as both member and score. This gives a true sliding window rather than a fixed hour boundary:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# rate_limiter.py
_RATE_KEY = "automation:actions_this_hour"
async def check_rate_limit() -> bool:
"""Return True if the service is under the max_actions_per_hour cap."""
r = get_redis()
now = int(time.time())
window_start = now - 3600
pipe = r.pipeline()
pipe.zremrangebyscore(_RATE_KEY, 0, window_start) # prune entries older than 60 min
pipe.zcard(_RATE_KEY) # count remaining
pipe.expire(_RATE_KEY, 7200) # safety TTL so key doesn't persist forever
results = await pipe.execute()
current_count = int(results[1])
return current_count < settings.max_actions_per_hour
async def record_action_taken() -> None:
"""Called after each live action executes."""
r = get_redis()
now = int(time.time())
await r.zadd(_RATE_KEY, {str(now): float(now)})
await r.expire(_RATE_KEY, 7200)
The pipeline matters here, ZREMRANGEBYSCORE (prune), ZCARD (count), and EXPIRE execute atomically as one round trip. Without the pipeline you’d have a race condition where a concurrent check could see the un-pruned count.
Why a sorted set rather than a simple counter? A counter resets at a fixed boundary (e.g., top of the hour). If the agent fires 5 actions at 2:59 PM, a counter would reset at 3:00 PM and allow 5 more immediately. The sorted set tracks actual timestamps, those 2:59 entries aren’t pruned until 3:59 PM.
One deliberate asymmetry: this cap applies to the scheduler’s automated decisions only. The Discord bot’s /approve-all command bypasses it. The reasoning is that MAX_ACTIONS_PER_HOUR is a safety rail against unattended automation going rogue overnight. A human consciously reviewing a queue of 20 IPs and hitting /approve-all is a different situation, they’re exercising judgment, not triggering a cascade. record_action_taken() still fires inside execute_and_verify() for every live block, so the metrics stay accurate; only the gate check is removed from the human approval path.
Prometheus metrics
The agent exposes /metrics for VictoriaMetrics to scrape. The metrics are designed to answer two questions: “is the agent working?” and “is it doing too much or too little?”
| Metric | Type | What it tracks |
|---|---|---|
automation_actions_total{status} |
Counter | Actions by outcome: success, fail, dry_run, pending, skipped |
automation_polls_total |
Counter | Total scheduler poll cycles |
automation_qualifying_ips_total |
Counter | IPs that passed score/bad-actor filter each poll |
automation_rate_limited_total |
Counter | IPs skipped because the hourly cap was hit |
automation_pending_approvals |
Gauge | Current pending approval queue size |
automation_session_duration_seconds |
Histogram | End-to-end per-session processing time |
automation_audit_commits_total |
Counter | GAIT commits (useful as a sanity check) |
automation_last_poll_timestamp |
Gauge | Unix time of last poll — goes stale if scheduler stops |
The automation_last_poll_timestamp gauge is the most important liveness signal. If it stops updating, the scheduler has died. The Grafana Automation dashboard can alert on staleness.
Configuration reference
New .env variables for Phase 5:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# pfSense — connection
PFSENSE_HOST=192.168.1.1
PFSENSE_VERIFY_SSL=false
# pfSense — XML-RPC Path B (most common, no REST API key needed)
PFSENSE_XMLRPC_USER=admin
PFSENSE_XMLRPC_PASS=your_pfsense_password
PFSENSE_XMLRPC_TARGET=alias # "alias" or "pfblockerng"
PFSENSE_FIREWALL_ALIAS=AutoAgent_Block_v4
# Discord — webhook (one-way outcome notifications, always used)
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/...
# Discord — bot (two-way /approve /reject slash commands)
DISCORD_BOT_TOKEN=your_bot_token
DISCORD_GUILD_ID=your_server_id # enables instant slash command sync vs ~1h global delay
# Safety controls
DRY_RUN=true # set false when ready to go live
AUTO_ACTION_THRESHOLD=80 # minimum score to evaluate
AUTO_APPROVE_THRESHOLD=95 # score above which Discord approval is skipped
MAX_ACTIONS_PER_HOUR=5 # unattended automation cap
# Block duration and escalation
BLOCK_TTL_HOURS=24
REPEAT_OFFENDER_THRESHOLD=5 # lifetime blocks before permanent block recommendation
HIGH_VOLUME_THRESHOLD=50 # events/hour before escalation to 168h
# GAIT audit trail
AUDIT_REPO_PATH=/app/audit-repo
AUDIT_GIT_USER_NAME=Convergence AutoAgent
AUDIT_GIT_USER_EMAIL=autoagent@convergence.local
Everything is a Pydantic BaseSettings field in config.py with a sensible default. The model_config = SettingsConfigDict(env_file=".env", extra="ignore") means you can add new fields without breaking existing deployments that haven’t updated their env files yet.
What’s next
Part 5b covers the execution half: the Discord bot’s five slash commands and the engineering behind the approval flow, the three pfSense execution paths (REST API, XML-RPC, SSH) and the transport bug that required abandoning xmlrpc.client, the asyncio write lock that fixed the /approve-all race condition, the executor’s baseline-sleep-verify-rollback cycle, the baseline and verification system that uses VictoriaMetrics PromQL queries to measure whether the block actually worked, and GAIT, the full audit trail system.
Code: https://github.com/byrn-baker/Convergence/tree/phase5-automation-agent
