MALTE WAGENBACH22 Feb 2026 12:52:02

How I Built an AI Monitoring System

February 22, 2026

I run a dozen AI workflows across multiple projects. Outreach personalization, lead research, email drafting, content summarization, quality scoring. For months I treated them like black boxes: send prompt in, get output out, move on. That worked until a $340 API bill showed up on a Sunday night because one workflow had been retrying on failures and quietly looping for six hours. I had no idea it was happening.

That was the moment I stopped treating observability as optional.

Why Monitoring Matters Before You Scale

Running multiple AI workflows without visibility is the equivalent of running a web server with no logs and no uptime monitoring. You only find out something is wrong when it is very wrong.

The problems compound in three distinct ways. First, costs spike without warning. LLM APIs charge per token. A workflow that works fine at 100 runs per day does not automatically behave the same at 1,000. Prompt bloat, retries on failure, accidental recursion, these all multiply cost in ways that are invisible until you get the bill.

Second, quality degrades silently. Model providers update their models. Prompts that worked in November start producing subtly worse outputs in February because the model changed. Without any quality signal logged alongside your calls, you have no way to detect this drift. Users notice before you do, which is the worst possible way to find out.

Third, you cannot make good architectural decisions without data. Is that GPT-4 call actually necessary, or would GPT-4o-mini do the same job? You cannot answer that question with instinct. You need latency distributions, output quality comparisons, and cost deltas. Without logging, every model selection decision is a guess.

The surprise bill is usually what triggers people to build this. For me it was six hours of a runaway workflow. For someone else it might be noticing that a workflow producing good outputs has been silently failing 30% of the time, returning cached or malformed responses that nobody caught because there was no error tracking.

What to Actually Monitor

Not all metrics are equally useful. Here is what I settled on after iterating.

Token usage per workflow and per project is the foundation. You need to know how many prompt tokens and completion tokens each workflow consumes, broken down by workflow name and project. This is the raw material for all cost calculations.

Cost per run and cost per outcome are different things. Cost per run tells you what a single execution costs. Cost per outcome tells you the unit economics. For an outreach workflow, cost per outcome is cost per email sent. For a lead research workflow, it is cost per lead enriched. This framing forces you to think about whether the AI spend is justified by the business result.

Latency at p50 and p95 matters because averages lie. A workflow with a 2 second average and a 25 second p95 has a tail latency problem that affects real users and downstream processes. Median latency tells you what normal feels like. p95 tells you how bad the bad cases are.

Error rates by workflow break down into API failures (rate limits, timeouts, auth errors), malformed outputs (the model returned something that failed your parsing or validation), and retries. A workflow retrying 40% of the time looks fine in your success count but is costing you 40% more than it should.

Quality scores, where applicable, should be logged alongside every call. This does not have to be sophisticated. A simple pass/fail based on your output validation, or a 1-5 score if you have a rubric, logged at call time, gives you a quality trend you can actually track. Without this, the observation that outputs seem worse lately is unfalsifiable.

Model usage breakdown answers the question of whether you are using the right tool for each job. Knowing that 60% of your token spend is going to GPT-4 when most of those calls do not require frontier model capability is the kind of insight that cuts your monthly bill significantly.

Volume trends catch runaway workflows before they become billing events. If a workflow normally runs 50 times per day and today it ran 800 times, that is an alert, not a data point to ignore.

The Architecture

The core idea is simple: every LLM call goes through a logging wrapper that captures everything you care about before and after the actual API call.

Here is the SQLite schema I settled on:

CREATE TABLE llm_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp TEXT NOT NULL DEFAULT (datetime('now')),
    workflow TEXT NOT NULL,
    project TEXT NOT NULL,
    model TEXT NOT NULL,
    prompt_tokens INTEGER NOT NULL,
    completion_tokens INTEGER NOT NULL,
    total_tokens INTEGER NOT NULL,
    cost_usd REAL NOT NULL,
    latency_ms INTEGER NOT NULL,
    success INTEGER NOT NULL DEFAULT 1,
    error_type TEXT,
    error_message TEXT,
    output_hash TEXT,
    quality_score INTEGER,
    metadata TEXT
);

CREATE INDEX idx_llm_logs_workflow ON llm_logs(workflow);
CREATE INDEX idx_llm_logs_project ON llm_logs(project);
CREATE INDEX idx_llm_logs_timestamp ON llm_logs(timestamp);
CREATE INDEX idx_llm_logs_model ON llm_logs(model);

The output_hash is an MD5 of the response text, which lets you detect when a workflow is returning identical outputs across many runs, a sign something is cached or stuck. The metadata column is a JSON string for anything workflow-specific you want to log without adding columns.

The Python wrapper looks like this:

import time
import hashlib
import sqlite3
from functools import wraps
from datetime import datetime

COST_PER_1K_TOKENS = {
    "gpt-4o": {"prompt": 0.0025, "completion": 0.01},
    "gpt-4o-mini": {"prompt": 0.00015, "completion": 0.0006},
    "claude-3-5-sonnet-20241022": {"prompt": 0.003, "completion": 0.015},
    "claude-3-5-haiku-20241022": {"prompt": 0.0008, "completion": 0.004},
}

DB_PATH = "~/monitoring/llm_logs.db"

def calculate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
    rates = COST_PER_1K_TOKENS.get(model, {"prompt": 0.002, "completion": 0.002})
    return (prompt_tokens / 1000 * rates["prompt"]) + (completion_tokens / 1000 * rates["completion"])

def log_llm_call(workflow: str, project: str):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            success = 1
            error_type = None
            error_message = None
            prompt_tokens = 0
            completion_tokens = 0
            output_hash = None
            model = kwargs.get("model", "unknown")

            try:
                result = func(*args, **kwargs)

                if hasattr(result, "usage"):
                    prompt_tokens = result.usage.prompt_tokens
                    completion_tokens = result.usage.completion_tokens
                    model = result.model

                content = ""
                if hasattr(result, "choices") and result.choices:
                    content = result.choices[0].message.content or ""
                elif hasattr(result, "content") and result.content:
                    content = result.content[0].text if result.content else ""

                output_hash = hashlib.md5(content.encode()).hexdigest()
                return result

            except Exception as e:
                success = 0
                error_type = type(e).__name__
                error_message = str(e)[:500]
                raise

            finally:
                latency_ms = int((time.time() - start_time) * 1000)
                total_tokens = prompt_tokens + completion_tokens
                cost_usd = calculate_cost(model, prompt_tokens, completion_tokens)

                try:
                    conn = sqlite3.connect(DB_PATH)
                    conn.execute("""
                        INSERT INTO llm_logs
                            (timestamp, workflow, project, model, prompt_tokens,
                             completion_tokens, total_tokens, cost_usd, latency_ms,
                             success, error_type, error_message, output_hash)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """, (
                        datetime.utcnow().isoformat(),
                        workflow, project, model,
                        prompt_tokens, completion_tokens, total_tokens,
                        cost_usd, latency_ms, success,
                        error_type, error_message, output_hash
                    ))
                    conn.commit()
                    conn.close()
                except Exception as db_error:
                    print(f"[monitoring] Failed to log: {db_error}")

        return wrapper
    return decorator

Wrapping an existing call is a one-liner:

@log_llm_call(workflow="lead_research", project="zeroslide")
def research_lead(lead: dict) -> str:
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": build_research_prompt(lead)}]
    )
    return response

The try/finally block is important because it ensures you log even when the call fails. Failed calls are some of the most valuable data points you have.

The Monitoring Architecture

Monitoring ArchitectureEvery LLM call flows through the same pipelineLLM Callsopenai / anthropicLogging Wrappertokens, cost, latencysuccess, output hashllm_logsSQLite, append-onlyDashboardSQL queriesAlertscron thresholds

The Dashboard Layer

The dashboard is not a Grafana instance or a SaaS analytics tool. It is a set of SQL queries I run when I want to understand what is happening. That is enough.

Daily cost by project, last 7 days:

SELECT
    project,
    date(timestamp) AS day,
    SUM(cost_usd) AS total_cost,
    COUNT(*) AS total_calls,
    SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) AS failed_calls
FROM llm_logs
WHERE timestamp >= date('now', '-7 days')
GROUP BY project, date(timestamp)
ORDER BY day DESC, total_cost DESC;

Error rate by workflow, last 24 hours:

SELECT
    workflow,
    COUNT(*) AS total_calls,
    SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) AS failures,
    ROUND(100.0 * SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) / COUNT(*), 1) AS error_rate_pct,
    GROUP_CONCAT(DISTINCT error_type) AS error_types
FROM llm_logs
WHERE timestamp >= datetime('now', '-24 hours')
GROUP BY workflow
ORDER BY error_rate_pct DESC;

Most expensive workflows this week:

SELECT
    workflow,
    project,
    SUM(cost_usd) AS total_cost,
    COUNT(*) AS total_runs,
    ROUND(SUM(cost_usd) / COUNT(*), 6) AS cost_per_run,
    GROUP_CONCAT(DISTINCT model) AS models_used
FROM llm_logs
WHERE timestamp >= date('now', '-7 days')
GROUP BY workflow, project
ORDER BY total_cost DESC
LIMIT 10;

Cost trend over 30 days:

SELECT
    date(timestamp) AS day,
    SUM(cost_usd) AS daily_cost,
    COUNT(*) AS total_calls,
    SUM(prompt_tokens + completion_tokens) AS total_tokens
FROM llm_logs
WHERE timestamp >= date('now', '-30 days')
GROUP BY date(timestamp)
ORDER BY day;

These four queries answer 90% of the questions worth asking: what is this costing, which workflows are failing, where is the latency concentrated, and what should be optimized first.

Cost by Workflow

Weekly Cost by Workflowexample output from the cost breakdown query$0$5$10$15$20lead_research$18.40email_draft$11.20summarize$6.80quality_score$3.10outreach_parse$1.20

Alerting

The alerting layer is a cron job that runs every 30 minutes, executes threshold queries, and sends a notification if anything is out of bounds.

import sqlite3
import requests
from datetime import datetime

DB_PATH = "~/monitoring/llm_logs.db"
WEBHOOK_URL = "https://hooks.slack.com/..."

THRESHOLDS = {
    "daily_cost_usd": 15.0,
    "workflow_error_rate_pct": 20.0,
    "hourly_calls": 500,
}

def check_daily_cost():
    conn = sqlite3.connect(DB_PATH)
    row = conn.execute("""
        SELECT SUM(cost_usd) FROM llm_logs
        WHERE date(timestamp) = date('now')
    """).fetchone()
    conn.close()
    total = row[0] or 0.0
    if total > THRESHOLDS["daily_cost_usd"]:
        return f"Daily cost alert: ${total:.2f} (threshold: ${THRESHOLDS['daily_cost_usd']})"
    return None

def check_error_rates():
    conn = sqlite3.connect(DB_PATH)
    rows = conn.execute("""
        SELECT
            workflow,
            ROUND(100.0 * SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) / COUNT(*), 1) AS error_rate
        FROM llm_logs
        WHERE timestamp >= datetime('now', '-1 hour')
        GROUP BY workflow
        HAVING COUNT(*) >= 10
    """).fetchall()
    conn.close()
    alerts = []
    for workflow, rate in rows:
        if rate > THRESHOLDS["workflow_error_rate_pct"]:
            alerts.append(f"High error rate on {workflow}: {rate}%")
    return alerts

def send_alert(message: str):
    requests.post(WEBHOOK_URL, json={"text": f"[AI Monitor] {message}"})

if __name__ == "__main__":
    alerts = []
    cost_alert = check_daily_cost()
    if cost_alert:
        alerts.append(cost_alert)
    alerts.extend(check_error_rates())
    for alert in alerts:
        send_alert(alert)
        print(f"{datetime.now().isoformat()} ALERT: {alert}")

The cron entry:

*/30 * * * * /usr/bin/python3 /path/to/monitor_check.py >> /var/log/ai_monitor.log 2>&1

What Changed After Building This

The first thing I found was that two workflows were using GPT-4o when GPT-4o-mini would have been sufficient. One was a simple extraction task pulling structured fields from unstructured text. The other was a classification workflow assigning categories from a fixed list. Neither required frontier model capability. Switching both cut the cost of those workflows by roughly 80%. The quality scores I had logged showed no meaningful difference in output quality.

The second finding was more alarming. A lead research workflow had a 31% error rate that had been running for three weeks before I built the monitoring system. It was failing silently: the outer code caught the exception, returned an empty result, and moved on. The logs showed hundreds of failed calls per week, all retried once, all failing again, all charged. The bug was a prompt that had grown too long and was hitting the context limit on specific inputs. I never would have caught it without the error rate query.

The third change was subtler. I could finally answer whether a workflow was worth the cost. For outreach personalization, the cost per email was $0.04. Given the reply rates, the math was obvious. For a different summarization workflow, the cost per document was $0.18 and nobody was reading the summaries. I turned it off.

Model selection decisions became data-driven. Latency distributions showed that for the use cases I was running, the p95 difference between GPT-4o and GPT-4o-mini was about 800 milliseconds. For non-real-time workflows, that is meaningless. For anything in a user-facing path, it matters. Knowing which is which changes how you build.

The Broader Principle

You cannot improve what you cannot see. This is the same principle that makes software teams instrument their services with metrics, traces, and structured logs. The discipline transfers directly to AI workflows.

The difference is that AI workflows have an additional failure mode that traditional software does not: silent quality degradation. A web server that crashes is visible. An LLM workflow that starts producing subtly worse outputs because the model was updated or the prompt has edge cases or the context is being truncated is invisible without explicit measurement.

The SQLite approach here scales further than you might expect. A table with ten million rows is still fast for the aggregation queries above with the right indexes. You do not need a time-series database or a columnar store until you are running millions of calls per day.

Start with the SQLite table. Add the logging wrapper to your most expensive workflow first. Run the cost and error rate queries after 48 hours. You will find something you did not know.