AgentSkillsCN

Import Conversation Logs

导入对话日志

SKILL.md

import-conversation-logs

Lane: ops
Version: 0.1.0
Owner: system
Schedule: 23:28 PT daily

Purpose

Ingest conversations from multiple AI chat platforms into the unified messages_v2 table. This is the foundation of the memory pipeline - all other skills depend on this data.

Sources

PlatformInbox PathFormat
ChatGPT/cosmocrat/inbox/raw/chatgpt/OpenAI export JSON
Claude/cosmocrat/inbox/raw/claude/Anthropic export JSON
Gemini/cosmocrat/inbox/raw/gemini/Google export JSON
Cursor/cosmocrat/inbox/raw/cursor/SQLite global_state.vscdb
Codex/cosmocrat/inbox/raw/codex/JSON files
Grok/cosmocrat/inbox/raw/grok/JSON files

Memory Contract

Writes

TypeTableRetentionDescription
conversation_messagemessages_v2permanentAll chat messages
conversation_topicconversation_topicspermanentExtracted topics

Access Mode

service_only - All writes go through MemoryClient. No direct ClickHouse access.

PII Handling

  • Mode: redact
  • Fields: content
  • Uses app/tools/pii_utils.py for redaction

Provenance

Every ingested message includes:

FieldSourceExample
run_idGeneratedingest_2025-12-22_a1b2c3d4
trace_idLangfuse or run_idtrace_xyz789
source_refPlatform:pathchatgpt:/cosmocrat/inbox/raw/chatgpt/web/2025-12-22/export.json
actorFixedimport-conversation-logs
intentOptionaldaily_conversation_ingest

Pipeline Position

code
multi-source-collector (23:22 PT)
         ↓
    inbox files
         ↓
import-conversation-logs (23:28 PT) ← YOU ARE HERE
         ↓
    messages_v2 table
         ↓
conversation-exporter (23:35 PT)
         ↓
executive-daily-report (00:02 PT)

Implementation

File: app/services/mcp/jobs/ingest/daily_ingest_runner.py

Key Functions

  • main() - Entry point, orchestrates all sources
  • process_inbox_source() - Per-source processing
  • check_routing_mismatches() - Signature validation
  • archive_processed_files() - Post-processing cleanup

Using MemoryClient

python
from app.services.memory import MemoryClient, MemoryRecord, Provenance

memory = MemoryClient()
provenance = Provenance(
    run_id=f"ingest_{date}_{uuid4().hex[:8]}",
    trace_id=os.environ.get("LANGFUSE_TRACE_ID", ""),
    source_ref=f"{source_name}:{inbox_path}",
    actor="import-conversation-logs"
)

records = [
    MemoryRecord(
        lane="ops",
        type="conversation_message",
        namespace="cosmocrat",
        content=row,
        metadata={"source": source_name}
    )
    for row in rows
]

results = memory.upsert_mem_batch(records, provenance, pii_mode="redact")

Error Handling

ErrorActionAlert
Empty inboxLog warning, continueNo
Source processor errorLog error, continue to next sourceSlack summary
Insert batch failureRetry once, log partial resultsSlack summary
Routing mismatchQuarantine file, log warningSlack if >5 files

Guards

  1. skip_if_empty_inbox - No-op if all inboxes empty
  2. validate_source_signatures - Detect misrouted files
  3. quarantine_misrouted_files - Move bad files to /cosmocrat/quarantine/

Environment Variables

VariableRequiredDescription
CLICKHOUSE_URLYesClickHouse HTTP endpoint
CLICKHOUSE_USERYesClickHouse username
CLICKHOUSE_PASSWORDYesClickHouse password
INBOX_ROOTNoDefault: /cosmocrat/inbox/raw
ARCHIVE_ROOTNoDefault: /cosmocrat/conversation_archive
LANGFUSE_TRACE_IDNoFor observability linking
COSMO_NOTIFY_SLACKNoDefault: true

Testing

bash
# Run locally
python -m app.services.mcp.jobs.ingest.daily_ingest_runner

# With backfill
python -m app.services.mcp.jobs.ingest.daily_ingest_runner --backfill-days 7

# Integration tests
pytest tests/skills/test_import_conversation_logs.py -v

Metrics

MetricTypeDescription
ingest.conversation.duration_msGaugeTotal run time
ingest.conversation.messages_countCounterMessages ingested
ingest.conversation.sources_processedCounterSources with data
ingest.conversation.files_processedCounterFiles processed
ingest.conversation.errors_countCounterErrors encountered
ingest.conversation.quarantined_countCounterFiles quarantined

Changelog

  • 0.1.0 (2025-12-22): Initial manifest, migrated to MemoryClient