AgentSkillsCN

rust-sqlite-ingestor-low-ram

利用字节扫描解析、针对不同模式的 SQLite PRAGMA 设置、基于 exec 的实时重启机制,以及精简的索引集,打造或优化一款 Rust SQLite 数据摄取器:在历史模式下能够高速处理海量 JSONL 日志,并在实时模式下严格控制内存占用,确保始终符合严格的 RSS 预算要求。

SKILL.md
--- frontmatter
name: rust-sqlite-ingestor-low-ram
description: Create or tune a Rust SQLite ingestor that ingests huge JSONL logs fast in historical mode and stays under a strict RSS budget in live mode using byte-scan parsing, mode-specific SQLite PRAGMAs, exec-based live restart, and a minimal index set.

Rust SQLite Ingestor (Low RAM Live Mode)

[Created by Codex: 019bef79-6ce1-7240-8260-a3f593daa9b6 2026-01-24]

Build or tune a Rust-based SQLite ingestor for massive JSONL streams with:

  • Historical mode: maximize throughput for bulk backfills
  • Live mode: enforce a strict RSS budget (prefer hard reset via exec() when possible)

Use Cases

  • Implement high-throughput ingestion of sse_lines.jsonl (Codex and Claude) into SQLite.
  • Debug or improve an existing ingestor that is too slow (JSON parsing, per-line commits, too many indexes).
  • Enforce low live-mode RAM even under bursty backfill periods.

Workflow (Recommended)

  1. Define the envelope schema and keep row writes simple (avoid per-event state machines unless strictly needed).
  2. Implement byte-scan extraction (no serde_json per line).
  3. Separate historical and live modes:
    • Historical: throughput-first PRAGMAs + larger cache + large transactions
    • Live: safety + strict RSS PRAGMAs + small cache + smaller transactions
  4. Restrict indexes to a short allowlist; avoid indexing large TEXT fields.
  5. Implement reliable historical→live transition:
    • File follow: prefer exec() into a fresh live process at a byte offset to guarantee RSS drops.
    • Stdin follow: use poll() idle detection; accept that exec() restart is not available.
  6. Validate the transition with a “3M then append 3M” stress test before declaring success.

Reference (Main Meat)

This section is adapted from /tmp/knowledge-base/high-performance-sqlite-ingestor.md and describes the concrete techniques used in the reference implementation.

High-Performance SQLite Ingestor: Technical Guide

[Created by Claude: ab7e3fa0-6c2e-49c0-a55f-21779f70b64a 2026-01-25] [Edited by Codex: 019bef79-6ce1-7240-8260-a3f593daa9b6 2026-01-24]

A deep-dive into the techniques used by sqlite-rust-ingestor to achieve 500k+ lines/sec throughput and ~15MB RAM in live mode.


Performance Results

ModeThroughputRAM Usage
Historical450k-570k lines/sec~1–6GB (configurable)
Live (idle/steady-state)Real-time streaming~15MB
Live (burst backfill)Depends on commit/index settingsCan spike into 100s of MB

RSS definition: The tool reports RSS via sysinfo (Process::memory()), i.e. resident set size (physical RAM currently held by the process). Cross-check with ps -o rss= -p <pid> or Activity Monitor.


Architecture Overview

code
┌─────────────────────────────────────────────────────────────────┐
│                        INPUT LAYER                               │
│  ┌─────────────┐    ┌─────────────┐                             │
│  │   stdin     │    │    file     │  (with --follow support)    │
│  │  (pipe)     │    │   reader    │                             │
│  └──────┬──────┘    └──────┬──────┘                             │
│         │                  │                                     │
│         └────────┬─────────┘                                     │
│                  ▼                                               │
│         ┌───────────────┐                                        │
│         │ StreamReader  │  stdin: poll()-based idle detection    │
│         │ (reader.rs)   │  file: EOF + --follow loop             │
│         └───────┬───────┘                                        │
└─────────────────┼───────────────────────────────────────────────┘
                  │
                  ▼  raw bytes (no JSON parsing!)
┌─────────────────────────────────────────────────────────────────┐
│                      PARSER LAYER                                │
│         ┌───────────────────────────────────────┐               │
│         │  Byte-level field extraction          │               │
│         │  • memchr::memmem for needle search   │               │
│         │  • Zero-copy &[u8] slices             │               │
│         │  • No serde_json, no allocation       │               │
│         └───────────────────────────────────────┘               │
└─────────────────┬───────────────────────────────────────────────┘
                  │
                  ▼  extracted fields as &str slices
┌─────────────────────────────────────────────────────────────────┐
│                     SQLITE LAYER                                 │
│         ┌───────────────────────────────────────┐               │
│         │  Ingestor (sqlite.rs)                 │               │
│         │  • Mode-specific PRAGMAs              │               │
│         │  • Batch commits (50k hist / 5k live) │               │
│         │  • Deferred index creation            │               │
│         │  • Drop unwanted indexes on startup   │               │
│         │  • PRAGMA shrink_memory (best-effort) │               │
│         └───────────────────────────────────────┘               │
└─────────────────────────────────────────────────────────────────┘

Core Technique #1: No JSON Parsing

The biggest performance win comes from never parsing JSON. Instead, use memchr/memmem for byte-level needle search.

The Problem with JSON Parsing

rust
// SLOW: Full JSON parse allocates HashMap, validates syntax, handles escapes
let obj: serde_json::Value = serde_json::from_slice(line)?;
let event = obj["event"].as_str();  // Multiple allocations

The Solution: Needle Search

rust
// FAST: Find byte pattern directly, return slice (zero allocation)
use memchr::memmem;

pub const EVENT_NEEDLE: &[u8] = b"\"event\":\"";

#[inline]
pub fn find_top_level_json_string_bytes<'a>(line: &'a [u8], needle: &[u8]) -> Option<&'a [u8]> {
    let idx = memmem::find(line, needle)?;
    let mut i = idx + needle.len();
    let start = i;
    let mut escaped = false;

    while i < line.len() {
        let b = unsafe { *line.get_unchecked(i) };
        if escaped {
            escaped = false;
            i += 1;
            continue;
        }
        if b == b'\\' {
            escaped = true;
            i += 1;
            continue;
        }
        if b == b'"' {
            return Some(&line[start..i]);  // Zero-copy slice!
        }
        i += 1;
    }
    None
}

Key Points

  1. memmem::find uses SIMD instructions for fast substring search
  2. Returns &[u8] slice - no allocation, points directly into input buffer
  3. unsafe { *line.get_unchecked(i) } - bounds check elision for hot loop
  4. Handles escapes - correctly finds closing " even with \" inside

Pre-defined Needles

rust
pub const EVENT_NEEDLE: &[u8] = b"\"event\":\"";
pub const T_NEEDLE: &[u8] = b"\"t\":\"";
pub const SID_NEEDLE: &[u8] = b"\"sid\":\"";
pub const METADATA_NEEDLE: &[u8] = b"\"metadata\":\"";
pub const FLOW_ID_NEEDLE: &[u8] = b"\"flow_id\":\"";

// For escaped nested JSON (metadata contains escaped JSON string)
pub const ESC_META_PID_NEEDLE: &[u8] = b"\\\"pid\\\":";
pub const ESC_META_CWD_NEEDLE: &[u8] = b"\\\"cwd\\\":\\\"";

Core Technique #2: Mode-Specific SQLite PRAGMAs

Historical Mode (Speed Over Safety)

rust
fn apply_mode_pragmas(&mut self, historical: bool) -> Result<()> {
    if historical {
        self.conn.execute_batch(
            "PRAGMA journal_mode=OFF;\n\
             PRAGMA synchronous=OFF;\n\
             PRAGMA temp_store=FILE;\n\
             PRAGMA mmap_size=0;"
        )?;
        // Large cache for historical: ~1GB default
        let kb = self.cfg.historical_cache_mb.saturating_mul(1024);
        self.conn.execute_batch(&format!("PRAGMA cache_size = -{};", kb))?;
    }
    // ...
}
PRAGMAValueEffect
journal_mode=OFFDisable journalingNo crash recovery, max speed
synchronous=OFFNo fsyncOS handles durability
cache_size=-1048576~1GB cacheKeep hot pages in memory

Live Mode (Safety + Low RAM)

rust
self.conn.execute_batch(
    "PRAGMA journal_mode=WAL;\n\
     PRAGMA synchronous=NORMAL;\n\
     PRAGMA temp_store=FILE;\n\
     PRAGMA mmap_size=0;\n\
     PRAGMA wal_autocheckpoint=1000;"
)?;
// Small cache for live: ~16MB default
let kb = self.cfg.live_cache_mb.saturating_mul(1024);
self.conn.execute_batch(&format!("PRAGMA cache_size = -{};", kb))?;
PRAGMAValueEffect
journal_mode=WALWrite-ahead logConcurrent reads, crash safe
synchronous=NORMALSync on checkpointBalance of safety/speed
cache_size=-16384~16MB cacheMinimal memory footprint
wal_autocheckpoint=1000Frequent checkpointsKeep WAL file small

Core Technique #3: RAM Release on Transition

The critical moment is the historical→live transition. Without explicit action, the process retains GBs of memory.

The shrink_memory PRAGMA

rust
pub fn shrink_for_live_mode(&mut self) -> Result<()> {
    self.conn.execute_batch("PRAGMA shrink_memory;")?;
    Ok(())
}

The Nuclear Option: exec() into Fresh Process

For guaranteed RAM release, the ingestor can exec() into a fresh copy of itself (file-follow only):

rust
fn exec_into_live(args: &Args, start_offset: u64) -> Result<()> {
    use std::os::unix::process::CommandExt;

    let exe = std::env::current_exe()?;
    let mut cmd = std::process::Command::new(exe);
    cmd.arg("--input").arg(&args.input)
       .arg("--follow")
       .arg("--start-offset").arg(start_offset.to_string())
       .arg("--db").arg(&args.db)
       .arg("--live-cache-mb").arg(args.live_cache_mb.to_string())
       .arg("--no-exec-live");  // Prevent infinite exec loop

    // exec() replaces current process - never returns on success
    let err = cmd.exec();
    Err(anyhow::Error::new(err))
}

Why this works: A fresh process starts with minimal RSS. The allocator and SQLite cache from historical mode are completely gone.


Core Technique #4: Efficient Batch Commits

rust
// FAST: Batch commits
const HISTORICAL_MAX_INSERTS_PER_TX: u64 = 50_000;
const LIVE_MAX_INSERTS_PER_TX: u64 = 5_000;

// Also time-based commits to bound latency
const HISTORICAL_COMMIT_MS: u64 = 1_000;
const LIVE_COMMIT_MS: u64 = 50;

if inserts_since_commit >= max_inserts_per_tx
   || last_commit.elapsed() >= commit_interval {
    ingestor.flush_and_commit()?;
    last_commit = Instant::now();
    inserts_since_commit = 0;
}
ModeMax Inserts/TXMax Time Between Commits
Historical50,0001 second
Live5,00050ms

Core Technique #5: Historical→Live Detection

For stdin (pipe from tail -f)

Use poll() to detect idle periods:

rust
const STDIN_POLL_TIMEOUT: Duration = Duration::from_millis(100);
const STDIN_NO_DATA_STREAK_THRESHOLD: u32 = 30;  // 30 × 100ms = 3 seconds

For file (direct read)

In file-follow mode, treat EOF as transition to live and keep reading as the file grows.

Notes

  • In file-follow mode, once historical finishes the ingestor can exec() into a fresh live process at an exact byte offset (--start-offset) to force RSS down.
  • This ingestor does not implement tail -F (log rotation / truncation handling). If the file is truncated/replaced while following, restart (or use a pipe from tail -F and accept that exec() isn’t available in stdin mode).

Core Technique #6: Deferred Index Creation

Indexes slow down inserts. Create them after historical ingestion completes.

Index Definitions (Allowlist)

rust
fn create_indexes(conn: &Connection) -> Result<()> {
    conn.execute_batch(
        "CREATE INDEX IF NOT EXISTS idx_sse_flow_id ON sse_lines(flow_id);\n\
         CREATE INDEX IF NOT EXISTS idx_sse_sid ON sse_lines(sid);\n\
         CREATE INDEX IF NOT EXISTS idx_sse_t ON sse_lines(t);\n\
         CREATE INDEX IF NOT EXISTS idx_sse_meta_pid ON sse_lines(meta_pid);\n\
         CREATE INDEX IF NOT EXISTS idx_sse_meta_cwd ON sse_lines(meta_cwd);\n\
         CREATE INDEX IF NOT EXISTS idx_sse_round ON sse_lines(round);\n\
         CREATE INDEX IF NOT EXISTS idx_sse_meta_turn_id ON sse_lines(meta_turn_id);"
    )?;
    Ok(())
}

Index Allowlist (important!)

Indexing large TEXT columns like line and metadata can destroy write throughput and blow up RSS (page splits + dirty pages) especially if you ever re-ingest from the start into an existing DB. The ingestor:

  • Creates only the 7 “hot” indexes above
  • Drops any other existing idx_sse_* indexes on startup

Also note: this ingestor does not dedupe. Re-running from the beginning without --from-scratch will append duplicates and can be much slower; prefer --start-at-end (or --start-offset) for ongoing ingestion.


Core Technique #7: Release Build Optimizations

toml
[profile.release]
lto = "fat"
codegen-units = 1
opt-level = 3
panic = "abort"
strip = true

Core Technique #8: Large Read Buffer

rust
const BUF_CAP_BYTES: usize = 8 * 1024 * 1024;  // 8MB buffer

Dependencies

toml
[dependencies]
memchr = "2.7"
rusqlite = "0.31"
clap = "4.5"
anyhow = "1.0"
libc = "0.2"
sysinfo = "0.30"
ahash = "0.8"