import-conversation-logs
Lane: ops
Version: 0.1.0
Owner: system
Schedule: 23:28 PT daily
Purpose
Ingest conversations from multiple AI chat platforms into the unified messages_v2 table. This is the foundation of the memory pipeline - all other skills depend on this data.
Sources
| Platform | Inbox Path | Format |
|---|---|---|
| ChatGPT | /cosmocrat/inbox/raw/chatgpt/ | OpenAI export JSON |
| Claude | /cosmocrat/inbox/raw/claude/ | Anthropic export JSON |
| Gemini | /cosmocrat/inbox/raw/gemini/ | Google export JSON |
| Cursor | /cosmocrat/inbox/raw/cursor/ | SQLite global_state.vscdb |
| Codex | /cosmocrat/inbox/raw/codex/ | JSON files |
| Grok | /cosmocrat/inbox/raw/grok/ | JSON files |
Memory Contract
Writes
| Type | Table | Retention | Description |
|---|---|---|---|
conversation_message | messages_v2 | permanent | All chat messages |
conversation_topic | conversation_topics | permanent | Extracted topics |
Access Mode
service_only - All writes go through MemoryClient. No direct ClickHouse access.
PII Handling
- •Mode:
redact - •Fields:
content - •Uses
app/tools/pii_utils.pyfor redaction
Provenance
Every ingested message includes:
| Field | Source | Example |
|---|---|---|
run_id | Generated | ingest_2025-12-22_a1b2c3d4 |
trace_id | Langfuse or run_id | trace_xyz789 |
source_ref | Platform:path | chatgpt:/cosmocrat/inbox/raw/chatgpt/web/2025-12-22/export.json |
actor | Fixed | import-conversation-logs |
intent | Optional | daily_conversation_ingest |
Pipeline Position
code
multi-source-collector (23:22 PT)
↓
inbox files
↓
import-conversation-logs (23:28 PT) ← YOU ARE HERE
↓
messages_v2 table
↓
conversation-exporter (23:35 PT)
↓
executive-daily-report (00:02 PT)
Implementation
File: app/services/mcp/jobs/ingest/daily_ingest_runner.py
Key Functions
- •
main()- Entry point, orchestrates all sources - •
process_inbox_source()- Per-source processing - •
check_routing_mismatches()- Signature validation - •
archive_processed_files()- Post-processing cleanup
Using MemoryClient
python
from app.services.memory import MemoryClient, MemoryRecord, Provenance
memory = MemoryClient()
provenance = Provenance(
run_id=f"ingest_{date}_{uuid4().hex[:8]}",
trace_id=os.environ.get("LANGFUSE_TRACE_ID", ""),
source_ref=f"{source_name}:{inbox_path}",
actor="import-conversation-logs"
)
records = [
MemoryRecord(
lane="ops",
type="conversation_message",
namespace="cosmocrat",
content=row,
metadata={"source": source_name}
)
for row in rows
]
results = memory.upsert_mem_batch(records, provenance, pii_mode="redact")
Error Handling
| Error | Action | Alert |
|---|---|---|
| Empty inbox | Log warning, continue | No |
| Source processor error | Log error, continue to next source | Slack summary |
| Insert batch failure | Retry once, log partial results | Slack summary |
| Routing mismatch | Quarantine file, log warning | Slack if >5 files |
Guards
- •skip_if_empty_inbox - No-op if all inboxes empty
- •validate_source_signatures - Detect misrouted files
- •quarantine_misrouted_files - Move bad files to
/cosmocrat/quarantine/
Environment Variables
| Variable | Required | Description |
|---|---|---|
CLICKHOUSE_URL | Yes | ClickHouse HTTP endpoint |
CLICKHOUSE_USER | Yes | ClickHouse username |
CLICKHOUSE_PASSWORD | Yes | ClickHouse password |
INBOX_ROOT | No | Default: /cosmocrat/inbox/raw |
ARCHIVE_ROOT | No | Default: /cosmocrat/conversation_archive |
LANGFUSE_TRACE_ID | No | For observability linking |
COSMO_NOTIFY_SLACK | No | Default: true |
Testing
bash
# Run locally python -m app.services.mcp.jobs.ingest.daily_ingest_runner # With backfill python -m app.services.mcp.jobs.ingest.daily_ingest_runner --backfill-days 7 # Integration tests pytest tests/skills/test_import_conversation_logs.py -v
Metrics
| Metric | Type | Description |
|---|---|---|
ingest.conversation.duration_ms | Gauge | Total run time |
ingest.conversation.messages_count | Counter | Messages ingested |
ingest.conversation.sources_processed | Counter | Sources with data |
ingest.conversation.files_processed | Counter | Files processed |
ingest.conversation.errors_count | Counter | Errors encountered |
ingest.conversation.quarantined_count | Counter | Files quarantined |
Changelog
- •0.1.0 (2025-12-22): Initial manifest, migrated to MemoryClient