Post

Building Convergence – A Journey from Network Observability to AI-Driven Automation Part 5b: AI-Driven Automation Agent

Building Convergence – A Journey from Network Observability to AI-Driven Automation Part 5b: AI-Driven Automation Agent

This is the second half of the Phase 5 writeup. Part 5a covered the agent’s core, the polling loop, IP filtering, Claude’s action proposal, the rate limiter, and repeat offender tracking. This part covers everything that happens once a decision is made, the Discord bot approval flow, how pfSense blocks actually get applied across three different execution paths, the executor’s measure-act-verify-rollback cycle, and the GAIT audit trail that records every decision to an immutable git branch.

Code is on the phase5-automation-agent branch.


The Discord bot: five slash commands

The bot runs as a persistent outbound WebSocket connection to Discord’s Gateway API. This is a key design point for a homelab, you don’t need a public IP or an HTTPS endpoint to receive Discord interactions. The bot reaches out to Discord, not the other way around.

The ConvergenceBot class extends discord.Client and registers slash commands via app_commands.CommandTree. The commands are registered inside start_bot() as closures over the bot instance and state.pending_approvals, which is the in-memory dict of unexpired sessions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# discord_bot.py

class ConvergenceBot(discord.Client):
    def __init__(self, guild_id: int | None = None) -> None:
        intents = discord.Intents.default()
        super().__init__(intents=intents)
        self.tree = app_commands.CommandTree(self)
        self._guild_id = guild_id

    async def setup_hook(self) -> None:
        """Called once after login — register slash commands with Discord."""
        if self._guild_id:
            guild = discord.Object(id=self._guild_id)
            self.tree.copy_global_to(guild=guild)
            await self.tree.sync(guild=guild)
        else:
            await self.tree.sync()
            # Global sync without a guild ID takes up to 1 hour to propagate

Setting DISCORD_GUILD_ID in your env is important for development, guild-scoped command syncs are instant, while global syncs can take up to an hour to show up in Discord. The bot also no-ops cleanly if DISCORD_BOT_TOKEN isn’t set, the service starts in webhook-only mode, so you can test the polling and analysis flow before adding a bot.

Session expiry

Every pending approval has a 4-hour expiry stored as an ISO timestamp:

1
2
3
4
5
6
7
8
9
10
11
# scheduler.py — needs_approval path

state.pending_approvals[session_id] = {
    "ip": ip,
    "threat_data": threat_data,
    "proposed_action": proposed,
    "pf_action": pf_action.to_dict(),
    "baseline": baseline,
    "created_at": datetime.now(timezone.utc).isoformat(),
    "expires_at": (datetime.now(timezone.utc) + timedelta(hours=4)).isoformat(),
}

Every bot command checks expiry before acting. The _get_pending() helper does this consistently:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# discord_bot.py

def _get_pending(session_id: str) -> dict | None:
    """Return pending session data only if it exists and has not expired."""
    data = state.pending_approvals.get(session_id)
    if not data:
        return None
    try:
        expires = datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00"))
        if datetime.now(timezone.utc) > expires:
            return None
    except Exception:
        return None
    return data

If you /approve a session ID that expired two hours ago, you get a clean “not found” response. No stale executions.

/pending — what’s waiting for review

The /pending command renders the approval queue as a Discord embed. Each unexpired session shows the IP, its threat score, the session ID (needed for /approve or /reject), and how many minutes until it expires:

1
2
3
4
@_bot.tree.command(name="pending", description="List pending automation approvals")
async def cmd_pending(interaction: discord.Interaction) -> None:
    await interaction.response.defer()
    await interaction.followup.send(embed=_pending_embed())

The embed builder iterates state.pending_approvals, filters to unexpired entries, and shows up to 10. If there are more than 10, a footer note says so. The defer() + followup.send() pattern is important — Discord requires a response within 3 seconds, and building the embed (especially if the dict is large) might take longer than that.

/approve <session_id> — approve and execute

The approve command has one subtle implementation detail that’s easy to get wrong: it pops the session from state.pending_approvals before starting execution, not after:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@_bot.tree.command(name="approve", description="Approve a pending pfSense block action")
@app_commands.describe(session_id="Session ID from the approval request")
async def cmd_approve(interaction: discord.Interaction, session_id: str) -> None:
    await interaction.response.defer()

    pending = _get_pending(session_id)
    if pending is None:
        await interaction.followup.send(embed=discord.Embed(
            title="❌ Not Found",
            description=f"Session `{session_id}` does not exist or has expired.",
            color=0xFF3333,
        ))
        return

    # Pop BEFORE firing the task — prevents a double-approval race condition
    state.pending_approvals.pop(session_id, None)
    m.automation_pending_approvals.set(len(state.pending_approvals))

    ip = pending["ip"]
    pa = pending["pf_action"]
    pf_action = PfBlockerAction(
        action_type=pa["type"],
        target_list=pa["target_list"],
        value=pa["value"],
        reason=pa["reason"],
        duration_hours=int(pa.get("duration_hours", settings.block_ttl_hours)),
    )

    approver = interaction.user.display_name if interaction.user else "Discord"
    await interaction.followup.send(embed=discord.Embed(
        title=f"✅ Approved — {ip}",
        description=(
            f"Action `{pa['type']}` on `{pa['value']}` approved by **{approver}**.\n"
            "Executing in the background — watch for the outcome notification."
        ),
        color=0x00CC66,
    ))

    asyncio.create_task(
        execute_and_verify(session_id, ip, pf_action, pending["baseline"],
                           pending["threat_data"], pending["proposed_action"], None)
    )

Pop-before-fire prevents a race where two operators both hit /approve on the same session_id within milliseconds of each other. The first pop removes it; the second call to _get_pending() returns None and gets the “not found” response. Without this, you’d fire the pfSense action twice.

The execution runs as a create_task() — the bot response returns immediately (“Executing in the background…”) and the actual pfSense call happens async. Discord’s 3-second response window makes this necessary; a live pfSense XML-RPC call can take 2–10 seconds.

/approve-all — bulk approve

/approve-all approves every unexpired session in one command. The implementation mirrors /approve but iterates all pending entries. Critically, it pops all sessions atomically before firing any tasks — not one-at-a-time — so there’s no window where a concurrent /approve <session_id> could double-fire an already-queued entry:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@_bot.tree.command(name="approve-all")
async def cmd_approve_all(interaction: discord.Interaction) -> None:
    await interaction.response.defer()

    pending_items: list[tuple[str, dict]] = []
    for sid, data in list(state.pending_approvals.items()):
        try:
            expires = datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00"))
            if datetime.now(timezone.utc) > expires:
                continue
        except Exception:
            continue
        pending_items.append((sid, data))

    # Human-initiated approvals bypass the automated rate limit.
    # MAX_ACTIONS_PER_HOUR is a safety cap for unattended auto-approve runs,
    # not for a human consciously reviewing a queue.
    to_approve = pending_items

    # Pop ALL sessions before firing ANY tasks
    for sid, _ in to_approve:
        state.pending_approvals.pop(sid, None)
    m.automation_pending_approvals.set(len(state.pending_approvals))

    # Fire execution tasks — serialised inside pfblocker by _xmlrpc_write_lock
    for sid, pending in to_approve:
        ip = pending["ip"]
        pa = pending["pf_action"]
        pf_action = PfBlockerAction(...)
        asyncio.create_task(execute_and_verify(sid, ip, pf_action, ...))

    await interaction.followup.send(embed=discord.Embed(
        title=f"✅ Bulk Approval — {len(to_approve)} Approved",
        ...
    ))

The comment about MAX_ACTIONS_PER_HOUR is worth expanding on. Early on, the /approve-all command was subject to the same hourly cap as the automated scheduler. So if the scheduler had already auto-approved 4 IPs that hour and you had 26 pending, /approve-all would respond: “4/26 Approved, 22 skipped (rate limit).” That’s wrong. The rate limit exists to protect against the scheduler going rogue and burning through your pfSense alias capacity overnight. A human explicitly reviewing each proposal has already exercised judgment — the rate limit is not the right control for that path.

/reject <session_id> and /reject-all

Rejection is simpler: pop the session, increment the skipped metric, send a confirmation embed. No pfSense call, no GAIT update beyond what was already committed when the proposal was created.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@_bot.tree.command(name="reject")
async def cmd_reject(interaction: discord.Interaction, session_id: str) -> None:
    await interaction.response.defer()

    pending = _get_pending(session_id)
    if pending is None:
        # not found or expired
        ...
        return

    state.pending_approvals.pop(session_id, None)
    m.automation_pending_approvals.set(len(state.pending_approvals))
    m.automation_actions_total.labels(status="skipped").inc()

    rejecter = interaction.user.display_name if interaction.user else "Discord"
    await interaction.followup.send(embed=discord.Embed(
        title=f"🚫 Rejected — {pending['ip']}",
        description=f"Rejected by **{rejecter}**. No changes will be made to pfSense.",
        color=0xFF3333,
    ))

/reject-all iterates all unexpired sessions in a single pass and pops them all before sending the response — same atomic-pop pattern as /approve-all.


pfSense execution: three paths, one waterfall

The block actually reaches pfSense through execute_pfblocker_add() in pfblocker.py. The design is a waterfall: try Path A (REST API), then Path B (XML-RPC exec_php), then Path C (SSH pfctl). First success wins. Unconfigured paths raise NotImplementedError and are silently skipped:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# pfblocker.py — execute_pfblocker_add()

for attempt_fn, label in [
    (_rest_api_add, "rest_api"),
    (_xmlrpc_add, "xmlrpc"),
    (_ssh_add, "ssh"),
]:
    try:
        attempt_result = await attempt_fn(action)
        if attempt_result.get("success"):
            result.update(attempt_result)
            result["method"] = label
            logger.info("pfSense action succeeded via %s: %s → %s",
                        label, action.value, action.target_list)
            return result
        logger.warning("%s attempt failed: %s — trying next method",
                       label, attempt_result.get("message"))
    except NotImplementedError:
        logger.debug("%s not configured; skipping", label)
    except Exception as exc:
        logger.warning("%s attempt raised: %s — trying next method", label, exc)

# All paths failed
result["message"] = (
    "All execution paths failed. Set PFSENSE_XMLRPC_PASS for XML-RPC, "
    "PFSENSE_API_KEY for REST API, or PFSENSE_SSH_KEY_PATH for SSH."
)

Before any live execution, DRY_RUN and PFSENSE_HOST are checked as hard stops:

1
2
3
4
5
6
7
8
9
10
if settings.dry_run:
    logger.info("[DRY-RUN] Would add %s to '%s'", action.value, action.target_list)
    result["success"] = True
    result["method"] = "dry_run"
    return result

if not settings.pfsense_host:
    result["message"] = "PFSENSE_HOST is not configured."
    logger.error("pfSense host not configured; refusing live execution")
    return result

This means dry-run mode and unconfigured-host mode both fail safely before any network calls happen.

Path A — REST API v2

Path A uses pfSense Plus 25.11’s REST API. If you have PFSENSE_API_KEY set, it posts to /api/v2/firewall/alias/entry with a Bearer token. Requires creating a Host alias in Firewall → Aliases and a block rule referencing that alias. The API is the cleanest path if you have pfSense Plus — no PHP exec involved.

Path B — XML-RPC exec_php (the transport problem)

Path B is what most people will use. It hits /xmlrpc.php on pfSense and runs PHP code directly via exec_php. No API key is needed — just the admin username and password.

The transport problem that killed several hours of debugging, Python’s built-in xmlrpc.client can’t parse pfSense’s responses. pfSense prepends PHP echo output to the HTTP response body before the XML-RPC envelope. So the raw HTTP body looks like this:

1
2
3
4
5
(any PHP echo output from exec_php)
<?xml version="1.0"?>
<methodResponse>
  <params><param><value><string>...</string></value></param></params>
</methodResponse>

xmlrpc.client tries to parse from byte 0, hits whatever PHP printed, and throws an ExpatError. The fix is to drop the high-level library entirely and use httpx directly:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# pfblocker.py — _xmlrpc_exec_php()

xml_body = (
    '<?xml version="1.0"?>'
    '<methodCall><methodName>pfsense.exec_php</methodName>'
    '<params><param>'
    f'<value><string>{xml_escape(php_code)}</string></value>'
    '</param></params></methodCall>'
)

async with httpx.AsyncClient(
    verify=settings.pfsense_verify_ssl,
    timeout=_XMLRPC_TIMEOUT,
) as client:
    resp = await client.post(
        url,
        content=xml_body.encode("utf-8"),
        headers={"Content-Type": "text/xml; charset=utf-8"},
        auth=(settings.pfsense_xmlrpc_user, settings.pfsense_xmlrpc_pass),
    )

if resp.status_code == 401:
    raise RuntimeError("Authentication failed — check credentials")
if not resp.is_success:
    raise RuntimeError(f"HTTP {resp.status_code}: {resp.text[:200]}")

# Split PHP echo output from the XML envelope
xml_start = resp.text.find("<?xml")
echo_output = resp.text[:xml_start].strip() if xml_start > 0 else ""
xml_part = resp.text[xml_start:]

# Check for XML-RPC fault
root = ET.fromstring(xml_part)
if root.find(".//fault") is not None:
    # parse fault code and string from struct members
    raise RuntimeError(f"XML-RPC fault {code_val}: {msg_val}")

return echo_output

The auth=(user, pass) tuple in httpx sends Authorization: Basic ... correctly. Encoding credentials in the URL (https://user:pass@host) works until someone’s password contains @ or /.

The xml_escape() call on the PHP code string is also important — without it, any < or & in the PHP would corrupt the XML body.

The race condition in XML-RPC writes

Once the transport worked, /approve-all had a different bug: only one IP would actually appear in the pfSense alias after bulk approval, even though every call returned success.

The root cause: pfSense’s alias write isn’t transactional. The PHP behind exec_php reads the current alias config, appends the new IP, writes it back. When 20 asyncio.create_task() calls fire concurrently, each one reads the same initial state (empty alias), appends its IP, and writes — last write wins.

The fix is a module-level asyncio.Lock that serializes all alias writes:

1
2
3
4
5
6
7
8
9
# pfblocker.py

import asyncio as _asyncio
_xmlrpc_write_lock = _asyncio.Lock()

async def _xmlrpc_alias_add(ip_cidr: str) -> dict:
    async with _xmlrpc_write_lock:
        echo_out = await _xmlrpc_exec_php(php_code)
    return {"success": True, "message": echo_out or "ok"}

All alias writes — _xmlrpc_alias_add and _xmlrpc_alias_delete — hold this lock. Each completes the full read-modify-write before the next begins. After deploying the fix, /approve-all on 20 IPs added all 20.

Path C — SSH pfctl

Path C connects via Paramiko SSH and runs pfctl -T add. It’s an emergency fallback for when the web interface isn’t available. One important caveat built into the code: SSH-based pfctl changes are runtime-only — they don’t survive a pfSense reboot or config reload. The comment in the code explicitly calls this out so you don’t get surprised when your block disappears after a firmware update.


The executor: measure, act, wait, verify, rollback

execute_and_verify() in executor.py is the terminal step for every live action, whether it was auto-approved or human-approved via Discord. It has a clear linear flow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# executor.py — execute_and_verify() (structure)

# 1. Execute
result = await execute_pfblocker_add(pf_action)
_record(session, "execution_result", result)

if not result.get("success"):
    rollback_result = await rollback_pfblocker_add(pf_action)
    _record(session, "rollback_result", rollback_result)
    # send failure notification, close session
    return

# 2. Wait for pfBlockerNG to reload its lists
await asyncio.sleep(300)   # 5 minutes

# 3. Verify
verification = await verify_action(ip, baseline, wait_minutes=5)
_record(session, "verification", verification)

# 4. Finish — record metrics, mark IP processed, increment block count
await record_action_taken()
await mark_ip_processed(ip, ttl_hours=pf_action.duration_hours)
total_blocks = await increment_block_count(ip)

# 5. Send outcome notification
await send_action_outcome(session_id, ip, success=True, outcome_message=outcome_msg)
session.close("success", success=True)

The 5-minute sleep is necessary and non-negotiable for pfBlockerNG. pfBlockerNG works by syncing its block lists to pfSense’s firewall tables on a schedule. After you modify the underlying custom list, it doesn’t take effect immediately — pfBlockerNG needs to detect the change and reload. In testing, propagation took 2–4 minutes consistently; 5 minutes is the safe margin.

The rollback_pfblocker_add() function mirrors execute_pfblocker_add() — same three paths (REST API → XML-RPC → SSH), same waterfall — but calls the delete/remove operation instead of add. It also holds _xmlrpc_write_lock, so a rollback doesn’t race with another concurrent add.

Baseline and verification

Before any action fires, capture_baseline() snapshots four VictoriaMetrics metrics for the IP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# baseline.py

_QUERIES: dict[str, str] = {
    "inbound_block_events": (
        'sum(increase(otelcol_pfsense_filterlog_total'
        '{{action="block",direction="in",src_ip=~".*{ip}.*"}}[{window}m])) or vector(0)'
    ),
    "outbound_pass_events": (
        'sum(increase(otelcol_pfsense_filterlog_total'
        '{{action="pass",direction="out",dst_ip=~".*{ip}.*"}}[{window}m])) or vector(0)'
    ),
    "threat_intel_score":    'threat_intel_ip_score{{ip="{ip}"}}',
    "known_bad_actor_flag":  'threat_intel_known_bad_actor{{ip="{ip}"}}',
}

async def capture_baseline(ip: str, lookback_minutes: int = 60) -> dict:
    async with httpx.AsyncClient() as client:
        for metric_name, query_template in _QUERIES.items():
            query = query_template.format(ip=ip, window=lookback_minutes)
            value = await _query_single(client, query)
            baseline["metrics"][metric_name] = value
    return baseline

After the 5-minute wait, verify_action() calls capture_baseline() again with a 5-minute window and compares:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# baseline.py — verify_action()

for key in pre_metrics:
    pre_val = pre_metrics.get(key)
    post_val = post_metrics.get(key)
    if pre_val is not None and post_val is not None:
        pct_change = ((post_val - pre_val) / pre_val) * 100 if pre_val > 0 else 0.0
        comparison[key] = {
            "before": pre_val,
            "after": post_val,
            "pct_change": round(pct_change, 1),
            "reduced": post_val < pre_val,
        }

# Heuristic: effective if inbound block events didn't increase
inbound_before = pre_metrics.get("inbound_block_events") or 0
inbound_after  = post_metrics.get("inbound_block_events") or 0
action_appears_effective = inbound_after <= inbound_before

The verification is explicitly informational, and there’s a counterintuitive wrinkle worth calling out. When pfBlockerNG is working correctly, you’d expect inbound_block_events to decrease after adding an IP to the block list — not increase — because pfBlockerNG drops traffic at a higher layer than pf, before pf would log a block event. So a “decrease in block events” can actually indicate success, not failure. The verification result is committed to GAIT regardless, so you have the data to review manually.


GAIT — Git Audit/Immutable Trail

This is the part I’m most proud of, and the part I’d recommend most strongly to anyone building AI automation for anything that matters.

The core problem with AI-driven automation is accountability. When the agent blocks an IP, you need to be able to answer: what data triggered this? What exactly did we ask Claude? What did Claude say? Was there a human approval? Did the block actually work? If someone asks you about it six months later, you need a complete answer.

Logs don’t fully cut it. Logs are append-only text — they can be rotated away, they don’t have structure, and they don’t capture the full context of a decision in one place. What GAIT does instead is commit a structured JSON file after every meaningful step of every session to a dedicated git branch — one branch per IP per decision. The branch is never rebased, never force-pushed. It just accumulates commits and lives in a Docker volume forever.

The implementation

Everything lives in audit/git_trail.py. Two classes: GitAuditTrail (the singleton that manages the repo) and AuditSession (one open session on one branch).

GitAuditTrail.initialize() runs on startup. If the repo already exists at /app/audit-repo, it opens it. If not, it creates it, writes a README explaining the branch structure, and makes the first commit:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# git_trail.py — GitAuditTrail.initialize()

audit_path = Path(settings.audit_repo_path)
audit_path.mkdir(parents=True, exist_ok=True)

if (audit_path / ".git").exists():
    self._repo = git.Repo(str(audit_path))
    logger.info("GAIT: opened existing audit repo at %s (commits=%d)",
                audit_path, len(list(self._repo.iter_commits())))
    return

# First-time init
self._repo = git.Repo.init(str(audit_path))
self._configure_git_identity()
# write README.md documenting the branch structure, commit it

open_session() creates the branch. Every session branches from main, so each branch is an independent fork with its own linear history — not entangled with other sessions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# git_trail.py — GitAuditTrail.open_session()

branch_name = f"automation-{_sanitize_branch(session_id)}"
# e.g. "automation-20260225-143021-1-2-3-4"

# Always start from main so branches are independent
self._repo.git.checkout(self._main_branch)

# Guard against duplicate names (timestamp in session_id makes this rare)
if branch_name in [h.name for h in self._repo.heads]:
    branch_name = f"{branch_name}-{int(time.time())}"

new_branch = self._repo.create_head(branch_name)
new_branch.checkout()

session_dir = Path(self._repo.working_dir) / "sessions" / branch_name
session_dir.mkdir(parents=True, exist_ok=True)

return AuditSession(repo=self._repo, branch=branch_name,
                    session_dir=session_dir, session_id=session_id)

AuditSession.record_turn() is called after every step. It writes a sequentially numbered JSON file and immediately commits it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# git_trail.py — AuditSession.record_turn()

def record_turn(self, name: str, data: Any, as_text: bool = False) -> Path:
    idx = f"{self._turn_counter:02d}"
    self._turn_counter += 1
    ext = ".txt" if as_text else ".json"
    filename = f"{idx}_{name}{ext}"
    fpath = self.session_dir / filename

    if as_text:
        fpath.write_text(str(data), encoding="utf-8")
    else:
        fpath.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")

    rel = str(fpath.relative_to(Path(self.repo.working_dir)))
    self.repo.index.add([rel])
    self.repo.index.commit(
        f"[{self.session_id}] turn: {name}",
        author=git.Actor(settings.audit_git_user_name, settings.audit_git_user_email),
        committer=git.Actor(settings.audit_git_user_name, settings.audit_git_user_email),
    )
    m.automation_audit_commits_total.inc()
    return fpath

The as_text=True path exists for the Claude prompt — prompts are plain text, not JSON, and storing them as .txt makes them more readable in git show or cat.

AuditSession.close() writes the final turn and logs the seal:

1
2
3
4
5
6
7
8
def close(self, outcome: str, success: bool) -> None:
    self.record_turn("outcome", {
        "outcome": outcome,
        "success": success,
        "closed_at": datetime.now(timezone.utc).isoformat(),
        "session_id": self.session_id,
        "branch": self.branch,
    })

What gets committed, and when

Two code paths contribute turns to each session — the scheduler records the decision side, the executor records the action side:

File Content When
00_input.json Threat data, config snapshot, block count, session ID Before analysis
01_baseline.json VictoriaMetrics counters before any action Before analysis
02_claude_prompt.txt Exact prompt string sent to Claude Before API call
03_proposed_action.json Claude’s raw JSON response + token counts After API call
04_decision.json dry_run / pending_approval / auto_approve + reason After decision gate
05_execution_result.json pfSense API call result + method used After pfSense call
06_verification.json Post-action metrics comparison After 5-min wait
07_outcome.json Final sealed outcome Session close

When a session goes through human approval, the approval endpoint re-opens the branch and adds an approval.json turn before execution begins:

1
2
3
4
5
6
7
8
9
10
11
# main.py — POST /api/automation/approve/{session_id}

session = trail.open_session(ip, f"{session_id}-approved")
session.record_turn(
    "approval",
    {
        "approved_at": datetime.now(timezone.utc).isoformat(),
        "approved_via": "api",
        "original_session_id": session_id,
    },
)

Design decisions

Non-blocking failures. Every GAIT call is wrapped in a try-except that logs the error and continues. An audit trail failure should never stop the actual security action. The security action is primary; the audit trail is secondary.

Independent branches. Branching from main every time means sessions can’t interfere with each other. If two sessions happen to run concurrently (rare, but possible during /approve-all), they’re on different branches and their commits don’t conflict.

Never rebased, never force-pushed. This is what makes the trail legally defensible. A git branch that has only been appended to — no history rewrites — can be verified as unmodified. Every commit has a cryptographic hash; any tampering would change the hashes.

Thread safety via APScheduler. The scheduler runs with max_instances=1, which means only one poll cycle can run at a time. Git operations aren’t thread-safe in GitPython, but this constraint means they won’t run concurrently through the scheduler path. The approval endpoint is the one case where concurrent GAIT writes could happen (multiple /approve calls simultaneously), but approval sessions re-open branches rather than sharing one, so they don’t conflict.

Querying the audit trail

Directly with git:

1
2
3
4
5
6
7
8
9
10
11
12
# List all sessions
git -C /path/to/audit-repo branch -a

# Review a specific session's decision
git -C /path/to/audit-repo checkout automation-20260225-143021-1-2-3-4
cat sessions/automation-20260225-143021-1-2-3-4/03_proposed_action.json

# Find all sessions where Claude recommended a permanent block
git -C /path/to/audit-repo grep "recommend_permanent_block" --all

# What happened last week?
git -C /path/to/audit-repo log --oneline --all --since="1 week ago"

Via the REST API:

1
2
curl http://localhost:8002/api/automation/audit
# Returns: { "sessions": [ { "branch": "...", "last_commit": "...", "committed_at": "..." }, ... ] }

The Grafana Automation dashboard reads this endpoint via the Infinity datasource and renders it as a table — branch name, last commit message, timestamp. Each row is one IP decision, one git branch, one audit trail.


Bugs and hard-won lessons

xmlrpc.client parse failure. The lesson here is: when a library fails against a specific server, capture the raw HTTP response before assuming the library is correct. pfSense prepending PHP output to XML-RPC responses is non-standard but documented. The solution was dropping down to httpx and parsing the response manually. If I’d looked at the raw response first, I’d have saved a few hours.

Alert spam. The mark_ip_processed() call was missing from the needs_approval branch. It was written for the auto-execute and dry-run paths but the approval branch was added later without it. The lesson: any code path that produces an external side effect (Discord notification, pfSense change, email) needs to be explicitly paired with its dedup/idempotency guard. Don’t add the side effect path without also adding the guard.

Rate limit blocking human approvals. This was a product logic error rather than a code bug. I conflated “rate limit” with “safety cap” and didn’t think clearly about the two different actors that would trigger actions: the automated scheduler (needs a rate limit) and a human operator using Discord (should not be rate-limited). The fix was simple once the distinction was clear.

The concurrent write race. /approve-all worked fine on small queues in testing, where the time between concurrent task executions was large enough for the read-modify-write to complete. The race only surfaced with larger queues (20+ IPs) in production. The asyncio.Lock is simple and correct. The deeper lesson: any async code that does read-modify-write on shared mutable state — whether that state is in Redis, in a file, or in a remote API — needs a lock.


Wrapping up

Phase 5 is the piece that makes everything else actionable. The threat intelligence pipeline from Part 4 generates signal. The automation agent acts on that signal — with a human in the loop for anything ambiguous, and a complete auditable record for everything.

The combination I’m most satisfied with is GAIT plus the approval gate. Neither is technically complex — a git repo and a Discord bot. But together they mean every action this system takes is explainable, reversible (the block TTL expires automatically), and forensically documented in a way that could survive a security audit or an incident review months later.

For a homelab this might feel like overkill. But if you’re running this in any environment where you’d have to explain a blocking decision to someone else — or to yourself six months later — the audit trail pays for itself the first time you need it.

Looking at Phase 6: dynamic baselines using VictoriaMetrics’ outlier_iqr_over_time() to replace the fixed-threshold alerts, and possibly multi-site federation for a second pfSense instance. Let me know what direction you’d find most interesting.

Ideas or homelab war stories? Find me on X @byrn_baker.

Code: https://github.com/byrn-baker/Convergence/tree/phase5-automation-agent

Thanks for reading!

Need a real lab environment?

I run a small KVM-based lab VPS platform designed for Containerlab and EVE-NG workloads — without cloud pricing nonsense.

Visit localedgedatacenter.com →
This post is licensed under CC BY 4.0 by the author.