Skip to content

Python API

The four objects re-exported from the top-level ctxrot module.

from ctxrot import (
    CtxRotCallback,     # attach to dspy.configure(callbacks=[...])
    CtxRotStore,        # read/write the SQLite database directly
    analyze_session,    # compute local rot metrics
    run_deep_analysis,  # RLM-powered semantic analysis
)

CtxRotCallback

The DSPy BaseCallback you attach to your dspy.configure(...) call. Pass store_content=True if you want the full prompt/completion text captured — it's required for repetition analysis.

Bases: BaseCallback

Captures LM and tool call data into SQLite.

Auto-creates a new session each time a top-level DSPy module run starts.

Concurrency-safe: each asyncify/streamify worker thread gets its own _SessionState via a ContextVar, so multiple concurrent agent calls do not interfere with each other.

Parameters:

Name Type Description Default
db_path str

Path to SQLite database file.

'ctxrot.db'
store_content bool

If True, also store full prompt messages and completion text for context rot analysis.

False
Source code in ctxrot/callback.py
def __init__(self, db_path: str = "ctxrot.db", store_content: bool = False) -> None:
    self._store = CtxRotStore(db_path)
    self._store_content = store_content
    # Cached so the .session_id property keeps working *after* a run finishes.
    # The ContextVar is cleared on module_end, but callers often want to
    # inspect the session id post-run (e.g. to pass it to `analyze`).
    self._last_session_id: str | None = None
    # Ensure the store is closed on interpreter shutdown so SQLite gets a
    # chance to truncate-checkpoint and remove the -wal/-shm sidecars.
    # weakref.ref lets us avoid keeping the callback alive just for atexit.
    store_ref = weakref.ref(self._store)
    atexit.register(_close_store_at_exit, store_ref)

session_id property

session_id: str | None

CtxRotStore

A thin SQLite wrapper. CtxRotCallback uses it internally, but you can also instantiate it yourself (with read_only=True for safety) when you want to query sessions programmatically without going through the CLI.

Persists LM call, tool call, and session data in SQLite.

Source code in ctxrot/storage.py
def __init__(
    self,
    db_path: str = "ctxrot.db",
    read_only: bool = False,
) -> None:
    self._db_path = db_path
    self._read_only = read_only
    self._lock = threading.Lock()
    self._con: sqlite3.Connection

    if read_only:
        read_only_uri = f"file:{db_path}?mode=ro"
        self._con = sqlite3.connect(
            read_only_uri, uri=True, check_same_thread=False
        )
    else:
        self._con = sqlite3.connect(db_path, check_same_thread=False)
        self._con.execute("PRAGMA journal_mode=WAL")
        self._con.execute("PRAGMA busy_timeout=5000")
        self._ensure_schema()

get_latest_session_id

get_latest_session_id() -> str | None
Source code in ctxrot/storage.py
def get_latest_session_id(self) -> str | None:
    row = self._con.execute(
        "SELECT id FROM sessions ORDER BY started_at DESC LIMIT 1"
    ).fetchone()
    return row[0] if row else None

get_session_ids

get_session_ids(
    since: str | None = None,
    until: str | None = None,
    terminal_state: str | None = None,
) -> list[str]

Return session IDs matching the filters, ordered by started_at ASC.

since and until are compared against started_at as ISO-8601 strings (lexicographic order matches chronological order). terminal_state is an exact match.

Source code in ctxrot/storage.py
def get_session_ids(
    self,
    since: str | None = None,
    until: str | None = None,
    terminal_state: str | None = None,
) -> list[str]:
    """Return session IDs matching the filters, ordered by started_at ASC.

    `since` and `until` are compared against `started_at` as ISO-8601 strings
    (lexicographic order matches chronological order). `terminal_state` is an
    exact match.
    """
    clauses: list[str] = []
    params: list = []
    if since is not None:
        clauses.append("started_at >= ?")
        params.append(since)
    if until is not None:
        clauses.append("started_at <= ?")
        params.append(until)
    if terminal_state is not None:
        clauses.append("terminal_state = ?")
        params.append(terminal_state)
    where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
    rows = self._con.execute(
        f"SELECT id FROM sessions {where} ORDER BY started_at ASC",
        params,
    ).fetchall()
    return [r[0] for r in rows]

get_session

get_session(session_id: str) -> dict | None
Source code in ctxrot/storage.py
def get_session(self, session_id: str) -> dict | None:
    row = self._con.execute(
        "SELECT id, started_at, ended_at, model, mode, terminal_state "
        "FROM sessions WHERE id = ?",
        [session_id],
    ).fetchone()
    if not row:
        return None
    return {
        "id": row[0],
        "started_at": row[1],
        "ended_at": row[2],
        "model": row[3],
        "mode": row[4],
        "terminal_state": row[5],
    }

get_session_summary

get_session_summary(session_id: str) -> dict
Source code in ctxrot/storage.py
def get_session_summary(self, session_id: str) -> dict:
    row = self._con.execute(
        """SELECT
             COUNT(*) as total_calls,
             COALESCE(SUM(prompt_tokens), 0) as total_prompt,
             COALESCE(SUM(completion_tokens), 0) as total_completion,
             COALESCE(SUM(cache_read_tokens), 0) as total_cache_read,
             COALESCE(SUM(cost), 0) as total_cost,
             COALESCE(SUM(duration_ms), 0) as total_duration_ms,
             MAX(prompt_tokens) as max_prompt_tokens
           FROM lm_calls WHERE session_id = ?""",
        [session_id],
    ).fetchone()
    total_calls = row[0]
    total_prompt = row[1]
    total_completion = row[2]
    total_cache_read = row[3]
    total_cost = row[4]
    total_duration_ms = row[5]
    max_prompt_tokens = row[6] or 0
    cache_hit_pct = (
        (total_cache_read / total_prompt * 100) if total_prompt > 0 else 0.0
    )
    return {
        "total_calls": total_calls,
        "total_prompt": total_prompt,
        "total_completion": total_completion,
        "total_cache_read": total_cache_read,
        "cache_hit_pct": round(cache_hit_pct, 1),
        "total_cost": total_cost,
        "total_duration_ms": total_duration_ms,
        "max_prompt_tokens": max_prompt_tokens,
    }

get_growth_data

get_growth_data(session_id: str) -> list[dict]
Source code in ctxrot/storage.py
def get_growth_data(self, session_id: str) -> list[dict]:
    rows = self._con.execute(
        """SELECT seq, prompt_tokens, completion_tokens,
                  cache_read_tokens, cache_write_tokens, cost, model
           FROM lm_calls WHERE session_id = ? ORDER BY seq""",
        [session_id],
    ).fetchall()
    return [
        {
            "seq": r[0],
            "prompt_tokens": r[1],
            "completion_tokens": r[2],
            "cache_read_tokens": r[3],
            "cache_write_tokens": r[4],
            "cost": r[5],
            "model": r[6],
        }
        for r in rows
    ]

get_tool_impact

get_tool_impact(session_id: str) -> list[dict]
Source code in ctxrot/storage.py
def get_tool_impact(self, session_id: str) -> list[dict]:
    rows = self._con.execute(
        """SELECT tool_name, COUNT(*) as call_count,
                  CAST(AVG(output_tokens_est) AS INTEGER) as avg_tokens
           FROM tool_calls WHERE session_id = ?
           GROUP BY tool_name ORDER BY avg_tokens DESC""",
        [session_id],
    ).fetchall()
    return [
        {"tool_name": r[0], "call_count": r[1], "avg_tokens": r[2]} for r in rows
    ]

get_lm_call_content

get_lm_call_content(session_id: str) -> list[dict]

Return stored LM call content for a session, ordered by seq.

Source code in ctxrot/storage.py
def get_lm_call_content(self, session_id: str) -> list[dict]:
    """Return stored LM call content for a session, ordered by seq."""
    try:
        rows = self._con.execute(
            """SELECT seq, messages_json, completion,
                      prompt_char_count, completion_char_count,
                      prompt_tokens, completion_tokens
               FROM lm_calls
               WHERE session_id = ? AND messages_json IS NOT NULL
               ORDER BY seq""",
            [session_id],
        ).fetchall()
    except sqlite3.OperationalError:
        return []
    return [
        {
            "seq": r[0],
            "messages_json": r[1],
            "completion": r[2],
            "prompt_char_count": r[3],
            "completion_char_count": r[4],
            "prompt_tokens": r[5],
            "completion_tokens": r[6],
        }
        for r in rows
    ]

get_tool_call_content

get_tool_call_content(session_id: str) -> list[dict]

Return stored tool call content for a session.

Source code in ctxrot/storage.py
def get_tool_call_content(self, session_id: str) -> list[dict]:
    """Return stored tool call content for a session."""
    try:
        rows = self._con.execute(
            """SELECT id, input_json, output_text, output_char_count
               FROM tool_calls
               WHERE session_id = ? AND input_json IS NOT NULL
               ORDER BY id""",
            [session_id],
        ).fetchall()
    except sqlite3.OperationalError:
        return []
    return [
        {
            "tool_call_id": r[0],
            "input_json": r[1],
            "output_text": r[2],
            "output_char_count": r[3],
        }
        for r in rows
    ]

truncate_all

truncate_all() -> None
Source code in ctxrot/storage.py
def truncate_all(self) -> None:
    self._con.execute("DELETE FROM lm_calls")
    self._con.execute("DELETE FROM tool_calls")
    self._con.execute("DELETE FROM sessions")
    self._con.commit()

close

close() -> None
Source code in ctxrot/storage.py
def close(self) -> None:
    con = getattr(self, "_con", None)
    if con is None:
        return
    del self._con
    if not self._read_only:
        try:
            con.execute("PRAGMA journal_mode=DELETE")
        except sqlite3.Error as e:
            _log.debug("journal_mode switch failed on close: %s", e)
    con.close()

analyze_session

Compute local repetition + efficiency metrics for one session. Returns a plain dict that's safe to json.dumps.

Analyze a session for context rot signals.

Returns a dict with
  • "has_content": whether store_content data was available
  • "repetition": per-iteration repetition scores (or None)
  • "efficiency": per-iteration efficiency ratios
  • "summary": human-readable summary dict
  • "session": session metadata (id, started_at, ended_at, model, mode, terminal_state)
Source code in ctxrot/analysis.py
def analyze_session(store: CtxRotStore, session_id: str) -> dict:
    """Analyze a session for context rot signals.

    Returns a dict with:
      - "has_content": whether store_content data was available
      - "repetition": per-iteration repetition scores (or None)
      - "efficiency": per-iteration efficiency ratios
      - "summary": human-readable summary dict
      - "session": session metadata (id, started_at, ended_at, model, mode,
                   terminal_state)
    """
    session = store.get_session(session_id)
    growth = store.get_growth_data(session_id)
    content = store.get_lm_call_content(session_id)

    efficiency = _compute_efficiency(growth)

    result: dict = {
        "has_content": bool(content),
        "repetition": None,
        "efficiency": efficiency,
        "summary": {},
        "session": session,
    }

    if content:
        result["repetition"] = _compute_repetition(content)

    result["summary"] = _build_summary(result)
    return result

run_deep_analysis

Kick off an RLM-powered deep analysis. Requires Deno and an API key. See Deep analysis for the full workflow and the CLI wrapper.

Run RLM-powered deep analysis on a session.

API credentials are resolved in order
  1. Explicit api_key / api_base parameters
  2. Environment variables (OPENAI_API_KEY, OPENAI_API_BASE)
  3. Variables loaded from env_file (default: .env)
Returns a dict with
  • "report": markdown analysis report
  • "trajectory": RLM's REPL interaction history
  • "session_id": the analyzed session ID
  • "missing_sections": list of required report sections not found
Source code in ctxrot/deep_analysis.py
def run_deep_analysis(
    store: CtxRotStore,
    session_id: str,
    query: str = "Perform a comprehensive context rot analysis.",
    main_model: str = "openai/gpt-5.4",
    sub_model: str = "openai/gpt-5.4-mini",
    max_iterations: int = 15,
    max_llm_calls: int = 30,
    verbose: bool = False,
    api_key: str | None = None,
    api_base: str | None = None,
    env_file: str | None = ".env",
) -> dict[str, Any]:
    """Run RLM-powered deep analysis on a session.

    API credentials are resolved in order:
      1. Explicit api_key / api_base parameters
      2. Environment variables (OPENAI_API_KEY, OPENAI_API_BASE)
      3. Variables loaded from env_file (default: .env)

    Returns a dict with:
      - "report": markdown analysis report
      - "trajectory": RLM's REPL interaction history
      - "session_id": the analyzed session ID
      - "missing_sections": list of required report sections not found
    """
    if not check_deno_available():
        msg = (
            "Deno runtime not found. "
            "RLM requires Deno for its sandboxed Python interpreter.\n"
            "Install: https://deno.land/#installation\n"
            "(e.g., curl -fsSL https://deno.land/install.sh | sh)"
        )
        raise RuntimeError(msg)

    import os

    # Load .env before creating LM instances
    if env_file:
        _load_env_file(env_file)

    rlm_data, full_content, full_tool_content = prepare_session_data(
        store, session_id
    )

    # Resolve credentials: explicit param > OPENAI_* env > API_* env
    resolved_key = (
        api_key or os.environ.get("OPENAI_API_KEY") or os.environ.get("API_KEY")
    )
    resolved_base = (
        api_base or os.environ.get("OPENAI_API_BASE") or os.environ.get("API_BASE")
    )

    lm_kwargs: dict[str, Any] = {}
    if resolved_key:
        lm_kwargs["api_key"] = resolved_key
    if resolved_base:
        lm_kwargs["api_base"] = resolved_base

    main_lm = dspy.LM(main_model, cache=False, **lm_kwargs)
    sub_lm = dspy.LM(sub_model, cache=False, **lm_kwargs)

    tools = _make_tools(full_content, full_tool_content)
    sig_cls = _build_signature(max_llm_calls=max_llm_calls)
    assert isinstance(sig_cls, type) and issubclass(sig_cls, dspy.Signature)

    with dspy.context(lm=main_lm):
        rlm = dspy.RLM(
            signature=sig_cls,
            max_iterations=max_iterations,
            max_llm_calls=max_llm_calls,
            tools=tools,
            sub_lm=sub_lm,
            verbose=verbose,
        )

        result = rlm(
            session_data=rlm_data,
            analysis_query=query,
        )

    report = result.analysis_report
    trajectory = getattr(result, "trajectory", [])
    has_content = bool(full_content)
    missing = _validate_report(report, has_content)

    return {
        "report": report,
        "trajectory": trajectory,
        "session_id": session_id,
        "missing_sections": missing,
    }