Post

Building Convergence – A Journey from Network Observability to AI-Driven Automation Part 4: AI Threat Intelligence Service

Building Convergence – A Journey from Network Observability to AI-Driven Automation Part 4: AI Threat Intelligence Service

Welcome back to the “Building Convergence” series! If you’ve been following along, Part 1 laid out the big-picture vision: nautobot as the golden source of truth, OpenTelemetry collecting everything, VictoriaMetrics/Loki/Grafana making it queryable and visual. Part 2 got the pipes in place—SNMP polling for Cisco switches, syslog ingestion from pfSense, GeoIP enrichment, and the first log-to-metrics counts. Part 3 turned that data into proactive defense: real-time Geomap threat plots, Loki ruler spike detection, Discord-paged alerts on country blocks or switch reboots, and cleanly separated Network/Security dashboards.

Now in Phase 4, we cross the line from observability into intelligent analysis. We introduce a dedicated threat-intel microservice that pulls the top blocked and outbound IPs from VictoriaMetrics, enriches them with external threat feeds (AbuseIPDB, GreyNoise, OTX, IPInfo), runs port analysis via Loki, feeds the context to Claude Haiku for pfSense-aware threat narratives and remediation recommendations, and surfaces everything in a brand-new “Threat Intelligence” Grafana dashboard using the Infinity datasource.

This is where the convergence really starts to feel magical—raw telemetry becomes contextual, AI-generated security intelligence tailored to your firewall. I’ll dive deep into the technical guts: configuration, Python code structure and snippets, metrics rationale, dashboard building, threat scoring logic, display mechanics, AI’s role, and the most painful bugs with their root causes and fixes.

All code lives in the Convergence repo on the phase4 branch. Spin it up with docker compose up -d --build after adding your API keys.


Overview

Phase 4 delivers:

  1. Automated IP Enrichment — Hourly job queries top blocked src IPs and outbound dst IPs, caches results (24h TTL), hits four threat intel APIs.
  2. Composite Scoring & Bad-Actor Flagging — Weighted score (0–100), RIOT-aware false-positive heuristics, outbound C2 detection.
  3. Port Attack Analysis — Loki LogQL extracts blocked ports from filterlog CSV, maps to known services (SNMP, RDP-Alt, etc.).
  4. Claude Haiku Narratives — pfSense-specific threat summaries, top threats with org/ASN context, false-positive flags, and exact remediation steps (Firewall Rules, pfBlockerNG lists, Diagnostics > States).
  5. Grafana Threat Intelligence Dashboard — Executive summary, AI narrative panels, scored IP tables, suspicious outbound views, port breakdowns—all via Infinity + VictoriaMetrics.

The service exposes Prometheus metrics (scraped by VictoriaMetrics) and JSON endpoints optimized for Infinity.


Architecture

VictoriaMetrics (firewall_events_total + threat_intel_* metrics) │ PromQL queries (top IPs, interfaces, ports) ▼ threat-intel service (FastAPI + APScheduler) :8001 ├─ Hourly + startup enrichment: │ ├─ Top 50 blocked src_ips + top 20 outbound dst_ips │ ├─ Active pfSense interfaces (for Claude context) │ ├─ Redis cache (24h TTL per IP) │ ├─ Enrich → AbuseIPDB / GreyNoise / OTX / IPInfo │ ├─ Loki port extraction from filterlog CSV │ └─ Claude Haiku → JSON narrative + actions ├─ /health, /metrics (Prometheus), /api/report, /api/infinity/* ▼ Infinity Datasource (proxy to threat-intel:8000) ▼ Grafana “Threat Intelligence” Dashboard ├─ Executive Summary (risk gauge, bad actor counts) ├─ AI Narrative (summary + pfSense actions) ├─ Inbound/Outbound Threat Scores ├─ Port Analysis (top ports, critical services) └─ Enriched IP Tables

All /api/infinity/* endpoints return arrays—even single objects wrapped in [{}]. This is required for reliable Infinity backend-proxy behavior.


Configuration and Setup

To get this running, we add a new service to docker-compose.yml: the threat-intel container based on a custom Dockerfile (Python 3.11 slim with FastAPI, APScheduler, httpx, pydantic-settings, anthropic, prometheus_client, redis). It depends on Redis (for caching), VictoriaMetrics (for queries), and Loki (for ports).

Modified files include:

  • docker-compose.yml: New service + Grafana’s Infinity plugin via GF_INSTALL_PLUGINS.
  • config/victoriametrics/prometheus.yml: Scrape job for /metrics.
  • config/grafana/provisioning/datasources/threat-intel.yaml: Infinity datasource with proxy access and allowedHosts.
  • config/grafana/provisioning/dashboards/convergence.yaml: New “Threat Intelligence” folder.
  • .env: API keys (more below).

API keys are crucial—add them to .env and force-recreate the container to load them:

Variable Source Free Tier Required?
ANTHROPIC_API_KEY Anthropic Console Pay-per-token Yes
ABUSEIPDB_API_KEY AbuseIPDB 1,000/day Recommended
GREYNOISE_API_KEY GreyNoise ~60/min unauth Optional
OTX_API_KEY AlienVault OTX Free Recommended
IPINFO_TOKEN IPInfo 50k/month Recommended

Other env vars (in config.py via Pydantic):

Variable Default Description
ENRICHMENT_INTERVAL_SECONDS 3600 Hourly runs
TOP_BLOCKED_IPS 50 Blocked to enrich
TOP_OUTBOUND_IPS 20 Outbound to enrich
CACHE_TTL_SECONDS 86400 24h cache
ABUSEIPDB_DAILY_BUDGET 900 Cap to avoid free-tier overage
LOOKBACK_HOURS 1 PromQL window
REDIS_URL redis://redis:6379/0 Cache conn
VICTORIAMETRICS_URL http://victoriametrics:8428 Query base
LOKI_URL http://loki:3100 Port query base
DATA_DIR /app/data For port_services.json

The Python Code: Structure and Key Snippets

The service is a FastAPI app in services/threat-intel/app/main.py, modularized for maintainability:

  • main.py: FastAPI routes (/health, /metrics, /api/report, /api/infinity/*)
  • scheduler.py: APScheduler background job (startup + hourly)
  • state.py: Global latest_report dict (shared between scheduler and API)
  • metrics.py: Prometheus gauges/counters
  • models.py: Pydantic schemas
  • enrichment/: Cache, API clients, aggregator
  • analysis/: VM PromQL client, Loki port extraction, Claude prompt builder

Core flow (enrich_and_analyze() in scheduler):

  1. Query VictoriaMetrics for top blocked/outbound IPs + interfaces
  2. Check Redis cache (24h TTL)
  3. Enrich uncached IPs async (with rate guards)
  4. Aggregate composite scores + flags
  5. Extract ports from Loki
  6. Build Claude prompt → generate narrative
  7. Update latest_report + set Prometheus metrics

Example aggregator snippet:

1
2
3
4
5
6
7
8
9
10
11
12
13
def aggregate_enrichment(ip: str, sources: dict) -> dict:
    abuse_score = sources.get('abuseipdb', {}).get('abuseConfidenceScore', 0)
    otx_pulses = sources.get('otx', {}).get('pulse_count', 0)
    gn_class = sources.get('greynoise', {}).get('classification', 'benign')
    
    composite = (
        abuse_score * 0.5 +
        min(otx_pulses, 20) / 20 * 30 +
        {'malicious': 20, 'unknown': 5, 'noise': 5}.get(gn_class, 0)
    )
    
    level = 'critical' if composite >= 75 else 'high' if composite >= 50 else ...
    return {'composite_score': composite, 'threat_level': level, ...}

Claude prompt builder excerpt:

1
2
3
4
5
6
7
8
9
10
11
12
prompt = f"""
Analyze this pfSense firewall data:
- Overall risk: {summary['overall_risk_level']}
- Top blocked IPs: {json.dumps(top_blocked[:15])}
- Suspicious outbound: {json.dumps(suspicious_outbound)}
- Interfaces: {interfaces}
- Top ports: {top_ports}
- Note: Outbound src_ips are NAT'd—use Diagnostics > States to find internal hosts.

Generate JSON with: executive_summary, top_threats (flag FPs), recommended_actions (pfSense paths like Firewall > Rules > igc0.201).
Avoid blocking benign orgs unless abuse >50.
"""

Rate guards:

  • AbuseIPDB: Redis daily counter caps at 900 calls
  • GreyNoise: asyncio.Semaphore(3) + 0.15s sleep inside lock

Prometheus Metrics: What, Why, and Labels

Exposed at /metrics, scraped into VictoriaMetrics for health monitoring and dashboard trends.

Metric Labels Why?
threat_intel_ip_score “ip, direction, country, org, classification” Per-IP risk timeseries
threat_intel_ip_event_count “ip, direction, action” Event volume context
threat_intel_abuseipdb_score “ip, direction” Source-specific trends
threat_intel_otx_pulses “ip, direction” Malware association tracking
threat_intel_greynoise_classification “ip, direction (2=malicious, etc.)” Behavior categorization
threat_intel_known_bad_actor “ip, direction, country” Quick bad-actor filtering
threat_intel_port_event_count “port, port_service, risk_level” Port attack volume/risk
threat_intel_enrichment_last_success_timestamp Health monitoring
threat_intel_enrichment_ips_processed_total Cycle efficiency
threat_intel_cache_hits_total Cache effectiveness

Important: Renamed service → port_service to avoid scrape-config overwrite. Restart VictoriaMetrics after adding scrape job.


Threat Scores: Logic, Sources, and Rationale

Composite Score (0–100): Balanced view avoiding single-source bias.

Component Weight Source Rationale
AbuseIPDB confidence 50% /v2/check Crowd-reported abuse—highest weight for real-world signal
OTX pulse count 30% /api/v1/indicators/IPv4/… Malware/IOC associations—capped at 20 to dampen outliers
GreyNoise classification 20% /v3/community/{ip} “Scanner/malicious behavior—RIOT flags benign infra (e.g., CDNs)”

Levels: none (<10), low (10–24), medium (25–49), high (50–74), critical (75+) Outbound C2 flagging: is_known_bad_actor=true if OTX >2 OR Abuse >20 OR GreyNoise malicious OR score >30 (non-RIOT). False-positive heuristic: GreyNoise RIOT OR org matches big-tech list → likely_false_positive=true. Claude adjusts recommendations. Why these sources? Free tiers, complementary signals, no paid subs required.


Building the Grafana Dashboard

Dashboard: dashboards/threat-intel/threat-intelligence.json (UID: threat-intelligence-v1) Rows & Panels:

  • Executive Summary: Risk stat, bad actor counts, critical ports, max score gauge, timestamp
  • AI Threat Narrative: Summary + actions table + top threats
  • Inbound Threat Scores: Top IPs, AbuseIPDB trend, score timeseries
  • Outbound Traffic Analysis: Suspicious (pre-filtered), all outbound, OTX hits
  • Port Attack Analysis: Top ports bar, critical table, risk pie
  • Enriched IP Tables: Blocked & outbound flat lists
  • Service Health: Enrichment timestamp, IPs processed, cache hits

Key techniques:

  • VictoriaMetrics: topk(10, threat_intel_ip_score{action=”block”}) etc.
  • Infinity: Full URLs (http://threat-intel:8000/api/infinity/…), array responses, flat JSON, text = selector for stat panels
  • Fix UID mismatch: Query /api/datasources, replace in JSON

Infinity gotchas: Relative URLs fail, objects need array wrap, nested selectors don’t work in proxy, booleans filter as strings (pre-filter API-side).

How It’s Displayed

  • Stats/gauges for quick risk/bad-actor overview
  • Tables for detailed IPs (ip, score, org, country, flags)
  • Timeseries for score trends
  • Bars/pies for port distribution/risk
  • Table panels for AI narrative (exec summary, threats, actions)

All auto-refresh—Infinity proxies for fresh data.


AI’s Role: Contextual Intelligence

Claude Haiku takes enriched data + pfSense context (interfaces, NAT note) and produces:

  • Executive summary
  • Per-threat analysis (why risky/FP, org/ASN)
  • Remediation steps: exact pfSense UI paths (Firewall > Rules > igc0.201, pfBlockerNG ASN lists, Diagnostics > States)

Prompt engineering ensures tailored output. Token budget: 2000 for detailed actions. Why AI? It connects dots: flags FPs, suggests ASN blocks for clusters, handles NAT limitation intelligently.

The prompt passes to Claude:

  • Summary counts (blocked IPs, bad actors, risk level)
  • Top 15 blocked IPs with: country, org, ASN, composite score, abuse score, OTX pulses, GreyNoise classification, RIOT flag, likely_false_positive heuristic flag
  • All outbound destinations with same enrichment fields
  • Suspicious outbound subset (is_known_bad_actor=true)
  • Active pfSense interface names from VictoriaMetrics (e.g. igc0.201, lagg0.102)
  • Top 10 targeted ports with service and risk level
  • Note explaining pfSense NAT masks internal source IPs

pfSense-specific instructions given to Claude

Claude is instructed to:

  1. top_threats: For each threat, name the IP, org/ASN, why it is or isn’t a real threat, flag false positives explicitly, include CIDR/ASN context
  2. recommended_actions: Use exact pfSense UI paths with real interface names:
    • Single IPs: Firewall > Rules > [INTERFACE] — add block rule for [IP], direction [in/out]
    • ASN ranges: pfBlockerNG > IP > IPv4 — add custom CIDR list for AS[NUMBER]
    • Suspicious outbound (source unknown due to NAT): pfSense Diagnostics > States — filter by [DST_IP]
    • Never recommend blocking known-benign orgs (Google, Apple, etc.) unless abuse_score > 50

Bugs, Root Causes, and Hard-Won Lessons

Building the threat-intel service produced lots of failures. Most were silent issues (blank panels, overwritten labels, empty LLM output, rate-limit storms) rather than crashes.

The table below groups them by category and summarizes the most instructive ones:

Category Bug / Symptom Root Cause Fix Summary Key Lesson / Takeaway
API Rate Limiting & LLM Reliability GreyNoise 429 flood after ~10 IPs 70 IPs → ~70 concurrent GreyNoise calls in <1s (exceeds ~60/min limit) Module-level asyncio.Semaphore(3) + 0.15s delay inside the lock Concurrent external calls need global coordination; put delays inside semaphores
  Claude returned empty string instead of JSON Sparse input (many 429s) caused model to emit empty content Guard: if not raw_text: return {"error": "empty_response"} + log stop_reason LLMs can fail silently even with strong instructions—always add empty-body checks
  Claude wrapped JSON in ```json fences Model habit not fully suppressed by prompt Post-process: strip fences if present Normalize LLM structured output defensively; instructions alone are not enough
Infinity Datasource Gotchas Relative URLs → https:///api/report (no Host error) Go backend does not prepend base_url; requires absolute URLs Use full URLs in every panel target + add url & allowedHosts to datasource YAML Never rely on relative URLs with backend-proxy plugins
  JSON objects → blank stat panels Infinity backend is array-first; single objects produce zero rows Return 1-element arrays from API: return [summary_dict] Design API responses for the weakest consumer (Infinity Go backend)
  Stat panels “No Data” despite correct data Infinity uses column text (not selector) as DataFrame field name Set text = selector (snake_case); use overrides for display names Match reduceOptions.fields regex to text property, not selector
  Text panels stuck on “Loading…” "type": "text" panels are static—targets are never executed Switch to "type": "table", showHeader: false, wrap: true Use table panels for dynamic markdown-like content
  Nested selectors (intel.country) return nothing Go backend treats selectors as literal keys, not paths Flatten JSON server-side ("country": "US" instead of nested) Flatten early when using backend-proxy datasources
Observability Pipeline Assumptions Loki port query returns zero results Assumed non-existent labels (action, log_type) — only job, etc. exist Use line filters (|~) + positional regexp; verify labels via API Always check actual labels with /loki/api/v1/labels before writing LogQL
  Regexp captured TCP options, not dst_port End-of-line anchor (\d+$) grabbed window scale value Positional regexp anchored around field [21]: ,(?:tcp|udp),...(?P<dst_port>\d+), Use structure-aware (positional) regex for CSV/log formats
  All port metrics labeled service="threat-intel" Scrape config service: 'threat-intel' overwrote metric-level label Rename label to port_service Avoid Prometheus-reserved label names (service, job, instance)
  Outbound src_ip always = WAN IP (NAT hiding internal sources) pfSense WAN logging is post-NAT Accept limitation; label as “pfSense NAT (x.x.x.x)”; instruct Claude to recommend Diagnostics > States Know exactly where logs are captured relative to NAT

These lessons were expensive in time but saved far more in future debugging. If you’re building something similar, prioritize:

  • Verifying labels and datasource behavior early
  • Flattening JSON for Infinity
  • Global rate limiting for external APIs
  • Defensive LLM output parsing

Live Data Snapshot (2026-02-22, 1-hour window)

Threat-Intel-Dashboard

Deployment Steps

First-time setup

1
2
3
4
5
6
7
8
9
10
11
# 1. Add API keys to .env
# ANTHROPIC_API_KEY, ABUSEIPDB_API_KEY, OTX_API_KEY, IPINFO_TOKEN, GREYNOISE_API_KEY

# 2. Build and start
docker compose up -d --build threat-intel

# 3. Restart VictoriaMetrics to reload prometheus.yml scrape config
docker compose restart victoriametrics

# 4. Force-recreate Grafana to install Infinity plugin + load new datasource
docker compose up -d --force-recreate grafana

After adding/changing API keys

1
docker compose up -d --force-recreate threat-intel

After changing app/ Python code

1
docker compose build threat-intel && docker compose up -d --force-recreate threat-intel

After changing dashboard JSON

Grafana auto-reloads provisioned dashboards every 30 seconds. No restart needed.

After changing datasource YAML

1
docker compose up -d --force-recreate grafana

Verification

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Service health
curl http://localhost:8001/health
# → {"status":"ok","report_available":true}

# Wait for enrichment (up to 60s after startup)
curl http://localhost:8001/api/infinity/summary

# Port analysis working
curl http://localhost:8001/api/infinity/ports | python3 -c "
import sys,json; d=json.load(sys.stdin)
print(f'{len(d)} ports detected')
for p in d[:5]: print(f'  port={p[\"port\"]:6} service={p[\"service\"]:15} risk={p[\"risk_level\"]}')"

# Suspicious outbound with source
curl http://localhost:8001/api/infinity/outbound_suspicious | python3 -m json.tool

# pfSense-specific recommendations
curl http://localhost:8001/api/infinity/actions | python3 -c "
import sys,json
for a in json.load(sys.stdin): print('•', a['action'])"

# VictoriaMetrics scraping port metrics with correct label
curl -s 'http://localhost:8428/api/v1/query?query=threat_intel_port_event_count' | python3 -c "
import sys,json
for m in json.load(sys.stdin)['data']['result'][:5]:
    print(m['metric'].get('port'), m['metric'].get('port_service'), m['metric'].get('risk_level'))"

# Redis cache populated
docker exec convergence-redis redis-cli KEYS 'threat-intel:ip:*' | wc -l

# Claude narrative
curl -s http://localhost:8001/api/report | python3 -c "
import sys,json; r=json.load(sys.stdin); n=r['narrative']
print('available:', n['available'], '| risk:', n.get('risk_level','N/A'))"

# Service logs
docker logs convergence-threat-intel --tail 50

Wrapping Up and Next Steps

Phase 4 turns firewall logs into AI-powered, pfSense-aware intelligence. No more guessing “is this bad?”, Claude tells you with context and steps.

Next up in Phase 5: event-driven automation, turning these high-risk alerts into automatic pfBlockerNG list additions and dynamic anomaly baselines.

If you have questions or suggestions, hit me up on X @byrn_baker or on LinkedIn. Stay tuned!

Need a real lab environment?

I run a small KVM-based lab VPS platform designed for Containerlab and EVE-NG workloads — without cloud pricing nonsense.

Visit localedgedatacenter.com →
This post is licensed under CC BY 4.0 by the author.