Rust SQLite Ingestor (Low RAM Live Mode)
[Created by Codex: 019bef79-6ce1-7240-8260-a3f593daa9b6 2026-01-24]
Build or tune a Rust-based SQLite ingestor for massive JSONL streams with:
- •Historical mode: maximize throughput for bulk backfills
- •Live mode: enforce a strict RSS budget (prefer hard reset via
exec()when possible)
Use Cases
- •Implement high-throughput ingestion of
sse_lines.jsonl(Codex and Claude) into SQLite. - •Debug or improve an existing ingestor that is too slow (JSON parsing, per-line commits, too many indexes).
- •Enforce low live-mode RAM even under bursty backfill periods.
Workflow (Recommended)
- •Define the envelope schema and keep row writes simple (avoid per-event state machines unless strictly needed).
- •Implement byte-scan extraction (no
serde_jsonper line). - •Separate historical and live modes:
- •Historical: throughput-first PRAGMAs + larger cache + large transactions
- •Live: safety + strict RSS PRAGMAs + small cache + smaller transactions
- •Restrict indexes to a short allowlist; avoid indexing large TEXT fields.
- •Implement reliable historical→live transition:
- •File follow: prefer
exec()into a fresh live process at a byte offset to guarantee RSS drops. - •Stdin follow: use
poll()idle detection; accept thatexec()restart is not available.
- •File follow: prefer
- •Validate the transition with a “3M then append 3M” stress test before declaring success.
Reference (Main Meat)
This section is adapted from /tmp/knowledge-base/high-performance-sqlite-ingestor.md and describes the concrete techniques used in the reference implementation.
High-Performance SQLite Ingestor: Technical Guide
[Created by Claude: ab7e3fa0-6c2e-49c0-a55f-21779f70b64a 2026-01-25] [Edited by Codex: 019bef79-6ce1-7240-8260-a3f593daa9b6 2026-01-24]
A deep-dive into the techniques used by sqlite-rust-ingestor to achieve 500k+ lines/sec throughput and ~15MB RAM in live mode.
Performance Results
| Mode | Throughput | RAM Usage |
|---|---|---|
| Historical | 450k-570k lines/sec | ~1–6GB (configurable) |
| Live (idle/steady-state) | Real-time streaming | ~15MB |
| Live (burst backfill) | Depends on commit/index settings | Can spike into 100s of MB |
RSS definition: The tool reports RSS via sysinfo (Process::memory()), i.e. resident set size (physical RAM currently held by the process). Cross-check with ps -o rss= -p <pid> or Activity Monitor.
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ INPUT LAYER │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ stdin │ │ file │ (with --follow support) │
│ │ (pipe) │ │ reader │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌───────────────┐ │
│ │ StreamReader │ stdin: poll()-based idle detection │
│ │ (reader.rs) │ file: EOF + --follow loop │
│ └───────┬───────┘ │
└─────────────────┼───────────────────────────────────────────────┘
│
▼ raw bytes (no JSON parsing!)
┌─────────────────────────────────────────────────────────────────┐
│ PARSER LAYER │
│ ┌───────────────────────────────────────┐ │
│ │ Byte-level field extraction │ │
│ │ • memchr::memmem for needle search │ │
│ │ • Zero-copy &[u8] slices │ │
│ │ • No serde_json, no allocation │ │
│ └───────────────────────────────────────┘ │
└─────────────────┬───────────────────────────────────────────────┘
│
▼ extracted fields as &str slices
┌─────────────────────────────────────────────────────────────────┐
│ SQLITE LAYER │
│ ┌───────────────────────────────────────┐ │
│ │ Ingestor (sqlite.rs) │ │
│ │ • Mode-specific PRAGMAs │ │
│ │ • Batch commits (50k hist / 5k live) │ │
│ │ • Deferred index creation │ │
│ │ • Drop unwanted indexes on startup │ │
│ │ • PRAGMA shrink_memory (best-effort) │ │
│ └───────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Core Technique #1: No JSON Parsing
The biggest performance win comes from never parsing JSON. Instead, use memchr/memmem for byte-level needle search.
The Problem with JSON Parsing
// SLOW: Full JSON parse allocates HashMap, validates syntax, handles escapes let obj: serde_json::Value = serde_json::from_slice(line)?; let event = obj["event"].as_str(); // Multiple allocations
The Solution: Needle Search
// FAST: Find byte pattern directly, return slice (zero allocation)
use memchr::memmem;
pub const EVENT_NEEDLE: &[u8] = b"\"event\":\"";
#[inline]
pub fn find_top_level_json_string_bytes<'a>(line: &'a [u8], needle: &[u8]) -> Option<&'a [u8]> {
let idx = memmem::find(line, needle)?;
let mut i = idx + needle.len();
let start = i;
let mut escaped = false;
while i < line.len() {
let b = unsafe { *line.get_unchecked(i) };
if escaped {
escaped = false;
i += 1;
continue;
}
if b == b'\\' {
escaped = true;
i += 1;
continue;
}
if b == b'"' {
return Some(&line[start..i]); // Zero-copy slice!
}
i += 1;
}
None
}
Key Points
- •
memmem::finduses SIMD instructions for fast substring search - •Returns
&[u8]slice - no allocation, points directly into input buffer - •
unsafe { *line.get_unchecked(i) }- bounds check elision for hot loop - •Handles escapes - correctly finds closing
"even with\"inside
Pre-defined Needles
pub const EVENT_NEEDLE: &[u8] = b"\"event\":\""; pub const T_NEEDLE: &[u8] = b"\"t\":\""; pub const SID_NEEDLE: &[u8] = b"\"sid\":\""; pub const METADATA_NEEDLE: &[u8] = b"\"metadata\":\""; pub const FLOW_ID_NEEDLE: &[u8] = b"\"flow_id\":\""; // For escaped nested JSON (metadata contains escaped JSON string) pub const ESC_META_PID_NEEDLE: &[u8] = b"\\\"pid\\\":"; pub const ESC_META_CWD_NEEDLE: &[u8] = b"\\\"cwd\\\":\\\"";
Core Technique #2: Mode-Specific SQLite PRAGMAs
Historical Mode (Speed Over Safety)
fn apply_mode_pragmas(&mut self, historical: bool) -> Result<()> {
if historical {
self.conn.execute_batch(
"PRAGMA journal_mode=OFF;\n\
PRAGMA synchronous=OFF;\n\
PRAGMA temp_store=FILE;\n\
PRAGMA mmap_size=0;"
)?;
// Large cache for historical: ~1GB default
let kb = self.cfg.historical_cache_mb.saturating_mul(1024);
self.conn.execute_batch(&format!("PRAGMA cache_size = -{};", kb))?;
}
// ...
}
| PRAGMA | Value | Effect |
|---|---|---|
journal_mode=OFF | Disable journaling | No crash recovery, max speed |
synchronous=OFF | No fsync | OS handles durability |
cache_size=-1048576 | ~1GB cache | Keep hot pages in memory |
Live Mode (Safety + Low RAM)
self.conn.execute_batch(
"PRAGMA journal_mode=WAL;\n\
PRAGMA synchronous=NORMAL;\n\
PRAGMA temp_store=FILE;\n\
PRAGMA mmap_size=0;\n\
PRAGMA wal_autocheckpoint=1000;"
)?;
// Small cache for live: ~16MB default
let kb = self.cfg.live_cache_mb.saturating_mul(1024);
self.conn.execute_batch(&format!("PRAGMA cache_size = -{};", kb))?;
| PRAGMA | Value | Effect |
|---|---|---|
journal_mode=WAL | Write-ahead log | Concurrent reads, crash safe |
synchronous=NORMAL | Sync on checkpoint | Balance of safety/speed |
cache_size=-16384 | ~16MB cache | Minimal memory footprint |
wal_autocheckpoint=1000 | Frequent checkpoints | Keep WAL file small |
Core Technique #3: RAM Release on Transition
The critical moment is the historical→live transition. Without explicit action, the process retains GBs of memory.
The shrink_memory PRAGMA
pub fn shrink_for_live_mode(&mut self) -> Result<()> {
self.conn.execute_batch("PRAGMA shrink_memory;")?;
Ok(())
}
The Nuclear Option: exec() into Fresh Process
For guaranteed RAM release, the ingestor can exec() into a fresh copy of itself (file-follow only):
fn exec_into_live(args: &Args, start_offset: u64) -> Result<()> {
use std::os::unix::process::CommandExt;
let exe = std::env::current_exe()?;
let mut cmd = std::process::Command::new(exe);
cmd.arg("--input").arg(&args.input)
.arg("--follow")
.arg("--start-offset").arg(start_offset.to_string())
.arg("--db").arg(&args.db)
.arg("--live-cache-mb").arg(args.live_cache_mb.to_string())
.arg("--no-exec-live"); // Prevent infinite exec loop
// exec() replaces current process - never returns on success
let err = cmd.exec();
Err(anyhow::Error::new(err))
}
Why this works: A fresh process starts with minimal RSS. The allocator and SQLite cache from historical mode are completely gone.
Core Technique #4: Efficient Batch Commits
// FAST: Batch commits
const HISTORICAL_MAX_INSERTS_PER_TX: u64 = 50_000;
const LIVE_MAX_INSERTS_PER_TX: u64 = 5_000;
// Also time-based commits to bound latency
const HISTORICAL_COMMIT_MS: u64 = 1_000;
const LIVE_COMMIT_MS: u64 = 50;
if inserts_since_commit >= max_inserts_per_tx
|| last_commit.elapsed() >= commit_interval {
ingestor.flush_and_commit()?;
last_commit = Instant::now();
inserts_since_commit = 0;
}
| Mode | Max Inserts/TX | Max Time Between Commits |
|---|---|---|
| Historical | 50,000 | 1 second |
| Live | 5,000 | 50ms |
Core Technique #5: Historical→Live Detection
For stdin (pipe from tail -f)
Use poll() to detect idle periods:
const STDIN_POLL_TIMEOUT: Duration = Duration::from_millis(100); const STDIN_NO_DATA_STREAK_THRESHOLD: u32 = 30; // 30 × 100ms = 3 seconds
For file (direct read)
In file-follow mode, treat EOF as transition to live and keep reading as the file grows.
Notes
- •In file-follow mode, once historical finishes the ingestor can
exec()into a fresh live process at an exact byte offset (--start-offset) to force RSS down. - •This ingestor does not implement
tail -F(log rotation / truncation handling). If the file is truncated/replaced while following, restart (or use a pipe fromtail -Fand accept thatexec()isn’t available in stdin mode).
Core Technique #6: Deferred Index Creation
Indexes slow down inserts. Create them after historical ingestion completes.
Index Definitions (Allowlist)
fn create_indexes(conn: &Connection) -> Result<()> {
conn.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_sse_flow_id ON sse_lines(flow_id);\n\
CREATE INDEX IF NOT EXISTS idx_sse_sid ON sse_lines(sid);\n\
CREATE INDEX IF NOT EXISTS idx_sse_t ON sse_lines(t);\n\
CREATE INDEX IF NOT EXISTS idx_sse_meta_pid ON sse_lines(meta_pid);\n\
CREATE INDEX IF NOT EXISTS idx_sse_meta_cwd ON sse_lines(meta_cwd);\n\
CREATE INDEX IF NOT EXISTS idx_sse_round ON sse_lines(round);\n\
CREATE INDEX IF NOT EXISTS idx_sse_meta_turn_id ON sse_lines(meta_turn_id);"
)?;
Ok(())
}
Index Allowlist (important!)
Indexing large TEXT columns like line and metadata can destroy write throughput and blow up RSS (page splits + dirty pages) especially if you ever re-ingest from the start into an existing DB. The ingestor:
- •Creates only the 7 “hot” indexes above
- •Drops any other existing
idx_sse_*indexes on startup
Also note: this ingestor does not dedupe. Re-running from the beginning without --from-scratch will append duplicates and can be much slower; prefer --start-at-end (or --start-offset) for ongoing ingestion.
Core Technique #7: Release Build Optimizations
[profile.release] lto = "fat" codegen-units = 1 opt-level = 3 panic = "abort" strip = true
Core Technique #8: Large Read Buffer
const BUF_CAP_BYTES: usize = 8 * 1024 * 1024; // 8MB buffer
Dependencies
[dependencies] memchr = "2.7" rusqlite = "0.31" clap = "4.5" anyhow = "1.0" libc = "0.2" sysinfo = "0.30" ahash = "0.8"