LLMemory Basic Usage
Installation
uv add llmemory # or pip install llmemory
Prerequisites:
- •Python 3.10 or higher
- •PostgreSQL 14+ (tested up to PostgreSQL 16)
- •pgvector extension 0.5.0+
- •OpenAI API key (or configure local embeddings)
Installing pgvector:
# Ubuntu/Debian sudo apt-get install postgresql-16-pgvector # macOS with Homebrew brew install pgvector # Or using CREATE EXTENSION in PostgreSQL: psql -d your_database -c "CREATE EXTENSION IF NOT EXISTS vector;"
Verifying pgvector installation:
SELECT * FROM pg_extension WHERE extname = 'vector'; -- Should return one row if installed correctly
API Overview
This skill documents core llmemory operations:
- •
LLMemory- Main interface class - •
DocumentType- Enum for document types - •
SearchType- Enum for search modes - •
ChunkingStrategy- Enum for chunking strategies - •
add_document()- Add and process documents - •
search()- Search for documents - •
search_with_routing()- Search with automatic query routing (detects answerable queries) - •
search_with_documents()- Search and return results with document metadata - •
list_documents()- List documents with pagination - •
get_document()- Retrieve a document (owner-scoped) - •
get_document_chunks()- Get chunks with pagination (owner-scoped) - •
get_chunk_count()- Get number of chunks for a document (owner-scoped) - •
delete_document()/delete_documents()- Delete documents (owner-scoped) - •
get_statistics()- Get owner statistics - •
db_manager- Access underlying database manager - •
initialize()/close()- Lifecycle management
Quick Start
import asyncio
from llmemory import LLMemory, DocumentType, SearchType
async def main():
# Initialize
memory = LLMemory(
connection_string="postgresql://localhost/mydb",
openai_api_key="sk-..."
)
await memory.initialize()
# Add a document
result = await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="example.txt",
document_type=DocumentType.TEXT,
content="Your document content here...",
metadata={"category": "example"}
)
print(f"Created document with {result.chunks_created} chunks")
# Search
results = await memory.search(
owner_id="workspace-1",
query_text="your search query",
search_type=SearchType.HYBRID,
limit=5
)
for result in results:
print(f"[{result.score:.3f}] {result.content[:80]}...")
# Clean up
await memory.close()
asyncio.run(main())
Complete API Documentation
LLMemory
Main interface for document operations.
Constructor:
LLMemory(
connection_string: Optional[str] = None,
openai_api_key: Optional[str] = None,
config: Optional[LLMemoryConfig] = None,
db_manager: Optional[AsyncDatabaseManager] = None
)
Parameters:
- •
connection_string(str, optional): PostgreSQL connection URL (format:postgresql://user:pass@host:port/database). Ignored ifdb_managerprovided. - •
openai_api_key(str, optional): OpenAI API key for embeddings. Can also be set viaOPENAI_API_KEYenvironment variable. - •
config(LLMemoryConfig, optional): Configuration object. Defaults to config from environment if not provided. - •
db_manager(AsyncDatabaseManager, optional): Existing database manager from shared pool (for production apps with multiple services).
Raises:
- •
ConfigurationError: If neither connection_string nor db_manager provided, or if configuration is invalid.
Example:
from llmemory import LLMemory
# Simple initialization
memory = LLMemory(
connection_string="postgresql://localhost/mydb",
openai_api_key="sk-..."
)
await memory.initialize()
LLMemory.from_db_manager()
Create instance from existing AsyncDatabaseManager (shared pool pattern).
Signature:
@classmethod
def from_db_manager(
cls,
db_manager: AsyncDatabaseManager,
openai_api_key: Optional[str] = None,
config: Optional[LLMemoryConfig] = None
) -> LLMemory
Parameters:
- •
db_manager(AsyncDatabaseManager, required): Existing database manager with schema already set - •
openai_api_key(str, optional): OpenAI API key - •
config(LLMemoryConfig, optional): Configuration object
Returns:
- •
LLMemory: Configured instance
Example:
from pgdbm import AsyncDatabaseManager, DatabaseConfig
from llmemory import LLMemory
# Create shared pool
config = DatabaseConfig(connection_string="postgresql://localhost/mydb")
shared_pool = await AsyncDatabaseManager.create_shared_pool(config)
# Create llmemory with shared pool
db_manager = AsyncDatabaseManager(pool=shared_pool, schema="llmemory")
memory = LLMemory.from_db_manager(
db_manager,
openai_api_key="sk-..."
)
await memory.initialize()
db_manager
Get the underlying database manager for health checks and monitoring.
Property:
@property def db_manager(self) -> Optional[AsyncDatabaseManager]
Returns:
- •
Optional[AsyncDatabaseManager]: Database manager instance if initialized, None otherwise
Example:
from llmemory import LLMemory
memory = LLMemory(connection_string="postgresql://localhost/mydb")
await memory.initialize()
# Access underlying database manager
db_mgr = memory.db_manager
if db_mgr:
# Check connection pool status
pool_status = await db_mgr.get_pool_status()
print(f"Active connections: {pool_status['active']}")
print(f"Idle connections: {pool_status['idle']}")
# Run health check
is_healthy = await db_mgr.health_check()
print(f"Database healthy: {is_healthy}")
When to use:
- •Health monitoring and observability
- •Accessing connection pool metrics
- •Database diagnostics
- •Integration with monitoring systems
initialize()
Initialize the library and database schema.
Signature:
async def initialize() -> None
Raises:
- •
DatabaseError: If database initialization fails - •
ConfigurationError: If configuration is invalid
Example:
memory = LLMemory(connection_string="postgresql://localhost/mydb") await memory.initialize() # Sets up tables, migrations, indexes
close()
Close all connections and cleanup resources.
Signature:
async def close() -> None
Example:
await memory.close()
Context Manager Pattern (Recommended):
async with LLMemory(connection_string="...") as memory:
# Use memory here
results = await memory.search(...)
# Automatically closed
Document Types
class DocumentType(str, Enum):
PDF = "pdf"
MARKDOWN = "markdown"
CODE = "code"
TEXT = "text"
HTML = "html"
DOCX = "docx"
EMAIL = "email"
REPORT = "report"
CHAT = "chat"
PRESENTATION = "presentation"
LEGAL_DOCUMENT = "legal_document"
TECHNICAL_DOC = "technical_doc"
BUSINESS_REPORT = "business_report"
UNKNOWN = "unknown"
Search Types
class SearchType(str, Enum):
VECTOR = "vector" # Vector similarity search only
TEXT = "text" # Full-text search only
HYBRID = "hybrid" # Combines vector + text (recommended)
Chunking Strategies
class ChunkingStrategy(str, Enum):
HIERARCHICAL = "hierarchical" # Default - Creates parent and child chunks for better context
FIXED_SIZE = "fixed_size" # Fixed-size chunks with overlap
SEMANTIC = "semantic" # Chunks based on semantic boundaries (slower, higher quality)
SLIDING_WINDOW = "sliding_window" # Sliding window with configurable overlap
Strategy descriptions:
- •HIERARCHICAL (default): Creates hierarchical parent and child chunks. Parent chunks provide broader context while child chunks are used for precise retrieval. Best for most use cases.
- •FIXED_SIZE: Creates fixed-size chunks with configurable overlap. Simple and fast, good for uniform documents.
- •SEMANTIC: Chunks based on semantic boundaries (paragraphs, sections). Slower but produces higher quality chunks that respect document structure.
- •SLIDING_WINDOW: Creates overlapping chunks using a sliding window approach. Good for ensuring no information is lost at chunk boundaries.
Usage:
from llmemory import ChunkingStrategy
# Use enum value
result = await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="example.txt",
document_type=DocumentType.TEXT,
content="Your document content...",
chunking_strategy=ChunkingStrategy.SEMANTIC # Use enum
)
# Or use string value (also valid)
result = await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="example.txt",
document_type=DocumentType.TEXT,
content="Your document content...",
chunking_strategy="hierarchical" # String also works
)
Model Classes
SearchResult
Search result from any search operation.
Fields:
- •
chunk_id(UUID): Chunk identifier - •
document_id(UUID): Document identifier - •
content(str): Chunk content - •
metadata(Dict[str, Any]): Chunk metadata - •
score(float): Overall relevance score - •
similarity(float, optional): Vector similarity score (0-1) - •
text_rank(float, optional): Full-text search rank - •
rrf_score(float, optional): Reciprocal Rank Fusion score - •
rerank_score(float, optional): Reranker score (when reranking enabled) - •
summary(str, optional): Chunk summary if generated - •
parent_chunks(List[DocumentChunk]): Surrounding chunks if requested
EnrichedSearchResult
Extended search result with document metadata (inherits from SearchResult).
Additional Fields:
- •
document_name(str): Name of the source document - •
document_type(str): Type of document - •
document_metadata(Dict[str, Any]): Document-level metadata
When used: Returned by search_with_documents()
SearchResultWithDocuments
Container for enriched search results.
Fields:
- •
results(List[EnrichedSearchResult]): Enriched search results - •
total(int): Total number of results
DocumentAddResult
Result of adding a document.
Fields:
- •
document(Document): Created document object with all fields - •
chunks_created(int): Number of chunks created - •
embeddings_created(int): Number of embeddings generated - •
processing_time_ms(float): Processing time in milliseconds
DocumentListResult
Result of listing documents with pagination.
Fields:
- •
documents(List[Document]): Document objects - •
total(int): Total matching documents (before pagination) - •
limit(int): Applied limit - •
offset(int): Applied offset
DocumentWithChunks
Document with optional chunks.
Fields:
- •
document(Document): Document object - •
chunks(Optional[List[DocumentChunk]]): Chunks if requested - •
chunk_count(int): Total number of chunks
OwnerStatistics
Statistics for an owner's documents.
Fields:
- •
document_count(int): Total documents - •
chunk_count(int): Total chunks - •
total_size_bytes(int): Estimated total size - •
document_type_breakdown(Optional[Dict[DocumentType, int]]): Count by document type - •
created_date_range(Optional[Tuple[datetime, datetime]]): (min_date, max_date) of document creation
DeleteResult
Result of batch delete operation.
Fields:
- •
deleted_count(int): Number of documents deleted - •
deleted_document_ids(List[UUID]): IDs of deleted documents
EmbeddingStatus
Enum for embedding generation status.
class EmbeddingStatus(str, Enum):
PENDING = "pending" # Job queued but not started
PROCESSING = "processing" # Currently generating embeddings
COMPLETED = "completed" # Successfully completed
FAILED = "failed" # Failed with error
EmbeddingJob
Represents a background embedding generation job.
Fields:
- •
chunk_id(UUID): Chunk being processed - •
provider_id(str): Embedding provider ID - •
status(EmbeddingStatus): Current status - •
retry_count(int): Number of retries attempted - •
error_message(Optional[str]): Error details if failed - •
created_at(datetime): When job was created - •
processed_at(Optional[datetime]): When processing finished
SearchQuery
Internal search query model (rarely used directly).
Fields:
- •
owner_id(str): Owner identifier - •
query_text(str): Search query text - •
search_type(SearchType): Type of search - •
limit(int): Maximum results - •
alpha(float): Hybrid search weight - •
metadata_filter(Optional[Dict[str, Any]]): Metadata filter - •
id_at_origin(Optional[str]): Single origin filter - •
id_at_origins(Optional[List[str]]): Multiple origins filter - •
date_from(Optional[datetime]): Start date - •
date_to(Optional[datetime]): End date - •
include_parent_context(bool): Include parent chunks - •
context_window(int): Number of parent chunks - •
rerank(bool): Enable reranking - •
enable_query_expansion(bool): Enable query expansion - •
max_query_variants(int): Max query variants
add_document()
Add a document and process it into searchable chunks.
Signature:
async def add_document(
owner_id: str,
id_at_origin: str,
document_name: str,
document_type: Union[DocumentType, str],
content: str,
document_date: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None,
chunking_strategy: str = "hierarchical",
chunking_config: Optional[ChunkingConfig] = None,
generate_embeddings: bool = True
) -> DocumentAddResult
Parameters:
- •
owner_id(str, required): Owner identifier for multi-tenancy (e.g., "workspace-123", "tenant-abc") - •
id_at_origin(str, required): Origin identifier within owner (e.g., "user-456", "thread-789") - •
document_name(str, required): Name of the document - •
document_type(DocumentType or str, required): Type of document - •
content(str, required): Full document content - •
document_date(datetime, optional): Document date for temporal filtering - •
metadata(Dict[str, Any], optional): Custom metadata (searchable viametadata_filter) - •
chunking_strategy(str, default: "hierarchical"): Chunking strategy to use - •
chunking_config(ChunkingConfig, optional): Custom chunking configuration - •
generate_embeddings(bool, default: True): Generate embeddings immediately
Returns:
- •
DocumentAddResultwith:- •
document(Document): Created document object - •
chunks_created(int): Number of chunks created - •
embeddings_created(int): Number of embeddings generated - •
processing_time_ms(float): Processing time in milliseconds
- •
Raises:
- •
ValidationError: If input validation fails (invalid owner_id, empty content, etc.) - •
DatabaseError: If database operation fails - •
EmbeddingError: If embedding generation fails
Example:
from llmemory import DocumentType
from datetime import datetime
result = await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="Q4 Report.pdf",
document_type=DocumentType.PDF,
content="Full document text here...",
document_date=datetime(2024, 10, 1),
metadata={
"category": "financial",
"department": "finance",
"confidential": False
}
)
print(f"Document ID: {result.document.document_id}")
print(f"Chunks: {result.chunks_created}")
print(f"Embeddings: {result.embeddings_created}")
print(f"Time: {result.processing_time_ms:.2f}ms")
search()
Search for documents.
Signature:
async def search(
owner_id: str,
query_text: str,
search_type: Union[SearchType, str] = SearchType.HYBRID,
limit: int = 10,
id_at_origin: Optional[str] = None,
id_at_origins: Optional[List[str]] = None,
metadata_filter: Optional[Dict[str, Any]] = None,
date_from: Optional[datetime] = None,
date_to: Optional[datetime] = None,
include_parent_context: bool = False,
context_window: int = 2,
alpha: float = 0.5,
query_expansion: Optional[bool] = None,
max_query_variants: Optional[int] = None,
rerank: Optional[bool] = None,
rerank_top_k: Optional[int] = None,
rerank_return_k: Optional[int] = None
) -> List[SearchResult]
Parameters:
- •
owner_id(str, required): Owner identifier for filtering - •
query_text(str, required): Search query text - •
search_type(SearchType or str, default: HYBRID): Type of search to perform - •
limit(int, default: 10): Maximum number of results - •
id_at_origin(str, optional): Filter by single origin ID - •
id_at_origins(List[str], optional): Filter by multiple origin IDs - •
metadata_filter(Dict[str, Any], optional): Filter by metadata (e.g.,{"category": "financial"}) - •
date_from(datetime, optional): Start date filter - •
date_to(datetime, optional): End date filter - •
include_parent_context(bool, default: False): Include surrounding chunks - •
context_window(int, default: 2): Number of surrounding chunks to include - •
alpha(float, default: 0.5): Hybrid search weight (0=text only, 1=vector only) - •
query_expansion(bool, optional): Enable query expansion (None = follow config) - •
max_query_variants(int, optional): Max query variants for expansion - •
rerank(bool, optional): Enable reranking (None = follow config) - •
rerank_top_k(int, optional): Candidates for reranker - •
rerank_return_k(int, optional): Results after reranking
Returns:
- •
List[SearchResult]where each result has:- •
chunk_id(UUID): Chunk identifier - •
document_id(UUID): Document identifier - •
content(str): Chunk content - •
metadata(Dict[str, Any]): Chunk metadata - •
score(float): Overall relevance score - •
similarity(float, optional): Vector similarity score - •
text_rank(float, optional): Text search rank - •
rrf_score(float, optional): Reciprocal Rank Fusion score - •
rerank_score(float, optional): Reranker score (when reranking enabled) - •
summary(str, optional): Chunk summary if available - •
parent_chunks(List[DocumentChunk]): Surrounding chunks if requested
- •
Raises:
- •
ValidationError: If input validation fails - •
SearchError: If search operation fails
Example:
from llmemory import SearchType
# Basic search
results = await memory.search(
owner_id="workspace-1",
query_text="quarterly revenue trends",
search_type=SearchType.HYBRID,
limit=5
)
for result in results:
print(f"Score: {result.score:.3f}")
print(f"Content: {result.content[:100]}...")
print(f"Metadata: {result.metadata}")
print("---")
# Advanced search with filters
results = await memory.search(
owner_id="workspace-1",
query_text="product launch strategy",
search_type=SearchType.HYBRID,
limit=10,
metadata_filter={"category": "strategy", "department": "product"},
date_from=datetime(2024, 1, 1),
date_to=datetime(2024, 12, 31),
alpha=0.7 # Favor vector search slightly
)
search_with_documents()
Search and return results enriched with document metadata.
Signature:
async def search_with_documents(
owner_id: str,
query_text: str,
search_type: Union[SearchType, str] = SearchType.HYBRID,
limit: int = 10,
metadata_filter: Optional[Dict[str, Any]] = None,
include_document_metadata: bool = True
) -> SearchResultWithDocuments
Parameters:
- •
owner_id(str, required): Owner identifier - •
query_text(str, required): Search query text - •
search_type(SearchType or str, default: HYBRID): Type of search - •
limit(int, default: 10): Maximum results - •
metadata_filter(Dict[str, Any], optional): Filter by metadata - •
include_document_metadata(bool, default: True): Include document-level metadata
Returns:
- •
SearchResultWithDocumentswith:- •
results(List[EnrichedSearchResult]): Enriched search results - •
total(int): Total number of results
- •
EnrichedSearchResult fields:
- •All fields from
SearchResult(chunk_id, content, score, etc.) - •
document_name(str): Name of the source document - •
document_type(str): Type of document - •
document_metadata(Dict[str, Any]): Document-level metadata
Raises:
- •
ValidationError: If input validation fails - •
SearchError: If search operation fails
Example:
# Search with document context
results_with_docs = await memory.search_with_documents(
owner_id="workspace-1",
query_text="quarterly financial performance",
search_type=SearchType.HYBRID,
limit=10
)
print(f"Found {results_with_docs.total} results")
for result in results_with_docs.results:
print(f"Document: {result.document_name}")
print(f"Type: {result.document_type}")
print(f"Score: {result.score:.3f}")
print(f"Content: {result.content[:100]}...")
print(f"Metadata: {result.document_metadata}")
print("---")
When to use:
- •When you need document context along with search results
- •Building UI that shows source documents
- •Grouping results by document
- •When document metadata is needed for filtering or display
list_documents()
List documents with pagination and filtering.
Signature:
async def list_documents(
owner_id: str,
limit: int = 20,
offset: int = 0,
document_type: Optional[DocumentType] = None,
order_by: Literal["created_at", "updated_at", "document_name"] = "created_at",
order_desc: bool = True,
metadata_filter: Optional[Dict[str, Any]] = None
) -> DocumentListResult
Parameters:
- •
owner_id(str, required): Owner identifier - •
limit(int, default: 20): Maximum documents to return - •
offset(int, default: 0): Number of documents to skip (for pagination) - •
document_type(DocumentType, optional): Filter by document type - •
order_by(str, default: "created_at"): Field to sort by - •
order_desc(bool, default: True): Sort descending - •
metadata_filter(Dict[str, Any], optional): Filter by metadata
Returns:
- •
DocumentListResultwith:- •
documents(List[Document]): Document objects - •
total(int): Total matching documents - •
limit(int): Applied limit - •
offset(int): Applied offset
- •
Raises:
- •
ValidationError: If parameters are invalid
Example:
# List recent documents
result = await memory.list_documents(
owner_id="workspace-1",
limit=20,
offset=0,
order_by="created_at",
order_desc=True
)
print(f"Total documents: {result.total}")
for doc in result.documents:
print(f"{doc.document_name} - {doc.document_type.value}")
# Filter by type and metadata
result = await memory.list_documents(
owner_id="workspace-1",
document_type=DocumentType.PDF,
metadata_filter={"category": "financial"},
limit=50
)
get_document()
Retrieve a specific document with optional chunks.
Signature:
async def get_document(
owner_id: str,
document_id: Union[str, UUID],
include_chunks: bool = False,
include_embeddings: bool = False
) -> DocumentWithChunks
Parameters:
- •
owner_id(str, required): Owner/workspace identifier (required for access control) - •
document_id(str or UUID, required): Document identifier - •
include_chunks(bool, default: False): Include all chunks for this document - •
include_embeddings(bool, default: False): Include embeddings with chunks (requiresinclude_chunks=True)
Returns:
- •
DocumentWithChunkswith:- •
document(Document): Document object - •
chunks(List[DocumentChunk], optional): Chunks if requested - •
chunk_count(int): Total number of chunks
- •
Raises:
- •
DocumentNotFoundError: If document doesn't exist - •
PermissionError: If the document belongs to a different owner
Example:
# Get document without chunks
doc_info = await memory.get_document(
owner_id="workspace-1",
document_id="uuid-here"
)
print(f"Document: {doc_info.document.document_name}")
print(f"Chunks: {doc_info.chunk_count}")
# Get document with all chunks
doc_with_chunks = await memory.get_document(
owner_id="workspace-1",
document_id="uuid-here",
include_chunks=True
)
for chunk in doc_with_chunks.chunks:
print(f"Chunk {chunk.chunk_index}: {chunk.content[:50]}...")
get_document_chunks()
Get chunks for a specific document with pagination.
Signature:
async def get_document_chunks(
owner_id: str,
document_id: Union[str, UUID],
limit: Optional[int] = None,
offset: int = 0
) -> List[DocumentChunk]
Parameters:
- •
owner_id(str, required): Owner/workspace identifier (required for access control) - •
document_id(str or UUID, required): Document identifier - •
limit(int, optional): Maximum number of chunks to return (None = all chunks) - •
offset(int, default: 0): Number of chunks to skip for pagination
Returns:
- •
List[DocumentChunk]: List of chunks ordered by chunk_index
Raises:
- •
DocumentNotFoundError: If document doesn't exist - •
PermissionError: If the document belongs to a different owner - •
ValidationError: If limit or offset are negative
Example:
# Get all chunks for a document
chunks = await memory.get_document_chunks(
owner_id="workspace-1",
document_id="uuid-here"
)
print(f"Total chunks: {len(chunks)}")
for chunk in chunks:
print(f"Chunk {chunk.chunk_index}: {chunk.content[:50]}...")
# Paginated retrieval
page_size = 10
offset = 0
while True:
chunks = await memory.get_document_chunks(
owner_id="workspace-1",
document_id="uuid-here",
limit=page_size,
offset=offset
)
if not chunks:
break
for chunk in chunks:
print(f"Chunk {chunk.chunk_index}: {chunk.content}")
offset += page_size
When to use:
- •Accessing document chunks without full document
- •Paginating through large documents
- •Processing chunks in batches
- •Inspecting chunking results
get_chunk_count()
Get the number of chunks for a document.
Signature:
async def get_chunk_count(
owner_id: str,
document_id: Union[str, UUID]
) -> int
Parameters:
- •
owner_id(str, required): Owner/workspace identifier (required for access control) - •
document_id(str or UUID, required): Document identifier
Returns:
- •
int: Number of chunks for the document
Raises:
- •
DocumentNotFoundError: If document doesn't exist - •
PermissionError: If the document belongs to a different owner
Example:
# Get chunk count
count = await memory.get_chunk_count(owner_id="workspace-1", document_id="uuid-here")
print(f"Document has {count} chunks")
# Check if document needs re-chunking
if count > 1000:
print("Warning: Very large document, consider splitting")
elif count == 0:
print("Warning: Document has no chunks")
When to use:
- •Quick check of document size
- •Validating chunking results
- •Deciding pagination strategy
- •Monitoring document processing
delete_document()
Delete a single document and all its chunks.
Signature:
async def delete_document(
owner_id: str,
document_id: Union[UUID, str]
) -> None
Parameters:
- •
owner_id(str, required): Owner/workspace identifier (required for access control) - •
document_id(UUID or str, required): Document ID to delete
Raises:
- •
ResourceNotFoundError: If document not found - •
PermissionError: If the document belongs to a different owner - •
DatabaseError: If deletion fails
Example:
await memory.delete_document(owner_id="workspace-1", document_id="uuid-here")
delete_documents()
Delete multiple documents.
Signature:
async def delete_documents(
owner_id: str,
document_ids: Optional[List[Union[str, UUID]]] = None,
metadata_filter: Optional[Dict[str, Any]] = None
) -> DeleteResult
Parameters:
- •
owner_id(str, required): Owner identifier (safety check) - •
document_ids(List[UUID or str], optional): Specific documents to delete - •
metadata_filter(Dict[str, Any], optional): Delete all matching metadata
Returns:
- •
DeleteResultwith:- •
deleted_count(int): Number of documents deleted - •
deleted_document_ids(List[UUID]): IDs of deleted documents
- •
Raises:
- •
ValueError: If neither document_ids nor metadata_filter provided - •
ValidationError: If owner_id is invalid
Example:
# Delete specific documents
result = await memory.delete_documents(
owner_id="workspace-1",
document_ids=["uuid-1", "uuid-2", "uuid-3"]
)
print(f"Deleted {result.deleted_count} documents")
# Delete by metadata
result = await memory.delete_documents(
owner_id="workspace-1",
metadata_filter={"category": "temp", "delete_after": "2024-01-01"}
)
get_statistics()
Get statistics for an owner's documents.
Signature:
async def get_statistics(
owner_id: str,
include_breakdown: bool = False
) -> OwnerStatistics
Parameters:
- •
owner_id(str, required): Owner identifier - •
include_breakdown(bool, default: False): Include breakdown by document type
Returns:
- •
OwnerStatisticswith:- •
document_count(int): Total documents - •
chunk_count(int): Total chunks - •
total_size_bytes(int): Estimated total size - •
document_type_breakdown(Dict[DocumentType, int], optional): Count by type - •
created_date_range(Tuple[datetime, datetime], optional): Date range
- •
Example:
stats = await memory.get_statistics(
owner_id="workspace-1",
include_breakdown=True
)
print(f"Documents: {stats.document_count}")
print(f"Chunks: {stats.chunk_count}")
print(f"Size: {stats.total_size_bytes / 1024 / 1024:.2f} MB")
if stats.document_type_breakdown:
for doc_type, count in stats.document_type_breakdown.items():
print(f" {doc_type.value}: {count}")
Common Patterns
Async Context Manager (Recommended)
async with LLMemory(connection_string="postgresql://localhost/mydb") as memory:
# Add documents
await memory.add_document(...)
# Search
results = await memory.search(...)
# Automatically closed
Batch Document Processing
documents = [
{"name": "doc1.txt", "content": "..."},
{"name": "doc2.txt", "content": "..."},
{"name": "doc3.txt", "content": "..."},
]
for doc in documents:
result = await memory.add_document(
owner_id="workspace-1",
id_at_origin="batch-import",
document_name=doc["name"],
document_type=DocumentType.TEXT,
content=doc["content"]
)
print(f"Added {doc['name']}: {result.chunks_created} chunks")
Filtered Search with Metadata
# Add document with metadata
await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="report.pdf",
document_type=DocumentType.PDF,
content="...",
metadata={
"category": "financial",
"year": 2024,
"quarter": "Q4",
"confidential": False
}
)
# Search with metadata filter
results = await memory.search(
owner_id="workspace-1",
query_text="revenue analysis",
metadata_filter={
"category": "financial",
"year": 2024
},
limit=10
)
Paginated Document Listing
page_size = 20
offset = 0
while True:
result = await memory.list_documents(
owner_id="workspace-1",
limit=page_size,
offset=offset
)
if not result.documents:
break
for doc in result.documents:
print(f"{doc.document_name}: {doc.chunk_count} chunks")
offset += page_size
if offset >= result.total:
break
Exception Reference
All llmemory exceptions inherit from LLMemoryError base class.
Exception Hierarchy
LLMemoryError (base) ├── ConfigurationError ├── ValidationError ├── DatabaseError │ └── ConnectionError ├── EmbeddingError ├── SearchError ├── ChunkingError ├── ResourceNotFoundError │ └── DocumentNotFoundError ├── RateLimitError └── PermissionError
LLMemoryError
Base exception for all llmemory errors.
When raised: Never raised directly, use specific subclasses
Usage:
from llmemory import LLMemoryError
try:
await memory.search(...)
except LLMemoryError as e:
# Catches all llmemory exceptions
print(f"LLMemory error: {e}")
ConfigurationError
Configuration is invalid or incomplete.
Common causes:
- •Missing required configuration (connection_string, API key)
- •Invalid configuration values (negative pool size, invalid dimensions)
- •Incompatible configuration combinations
When raised:
- •During
LLMemory()initialization if neither connection_string nor db_manager provided - •During
initialize()if config validation fails - •When embedding provider configuration is invalid
Example:
from llmemory import ConfigurationError
try:
# Missing connection_string
memory = LLMemory() # Raises ConfigurationError
except ConfigurationError as e:
print(f"Invalid configuration: {e}")
ValidationError
Input validation failed.
Common causes:
- •owner_id too long or invalid characters
- •Empty or too long content
- •Invalid document_name
- •Negative limit or offset values
When raised:
- •During
add_document()if owner_id, id_at_origin, or content invalid - •During
search()if owner_id or query_text invalid - •During
list_documents()if pagination parameters invalid
Example:
from llmemory import ValidationError
try:
await memory.add_document(
owner_id="", # Empty owner_id - invalid
id_at_origin="user-123",
document_name="doc.txt",
document_type=DocumentType.TEXT,
content="content"
)
except ValidationError as e:
print(f"Validation failed: {e}")
# Output: "Validation failed: owner_id cannot be empty"
DatabaseError
Database operation failed.
Common causes:
- •Connection to PostgreSQL failed
- •Query execution failed
- •Transaction rollback
- •Schema migration failed
When raised:
- •During
initialize()if database setup fails - •During any CRUD operation if database query fails
- •During
add_document()if insert fails
Example:
from llmemory import DatabaseError
try:
await memory.add_document(...)
except DatabaseError as e:
print(f"Database error: {e}")
# Possible causes: connection lost, disk full, constraint violation
ConnectionError
Cannot connect to database (subclass of DatabaseError).
Common causes:
- •PostgreSQL not running
- •Wrong connection string
- •Network issues
- •Firewall blocking connection
When raised:
- •During
initialize()if connection fails - •During operations if connection is lost
Example:
from llmemory import ConnectionError
try:
memory = LLMemory(connection_string="postgresql://invalid:5432/db")
await memory.initialize()
except ConnectionError as e:
print(f"Cannot connect to database: {e}")
EmbeddingError
Embedding generation failed.
Common causes:
- •OpenAI API key invalid or missing
- •OpenAI rate limit exceeded
- •Local embedding model failed to load
- •Invalid embedding dimensions
When raised:
- •During
add_document()if generate_embeddings=True and embedding fails - •During
process_pending_embeddings()if batch processing fails
Example:
from llmemory import EmbeddingError
try:
await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="doc.txt",
document_type=DocumentType.TEXT,
content="content",
generate_embeddings=True # Will fail if no API key
)
except EmbeddingError as e:
print(f"Embedding generation failed: {e}")
SearchError
Search operation failed.
Common causes:
- •Invalid search query syntax
- •Vector index not built
- •Embedding provider not configured for vector search
- •Search timeout exceeded
When raised:
- •During
search()if query execution fails - •During vector search if embeddings table doesn't exist
- •During hybrid search if either vector or text search fails
Example:
from llmemory import SearchError
try:
results = await memory.search(
owner_id="workspace-1",
query_text="test",
search_type=SearchType.VECTOR # Fails if no embeddings
)
except SearchError as e:
print(f"Search failed: {e}")
ChunkingError
Document chunking failed.
Common causes:
- •Invalid chunking configuration
- •Document too large to chunk
- •Chunking strategy not supported for document type
When raised:
- •During
add_document()if chunking fails - •During
process_document()if chunker fails
Example:
from llmemory import ChunkingError
try:
await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="huge.txt",
document_type=DocumentType.TEXT,
content="x" * 100_000_000 # Too large
)
except ChunkingError as e:
print(f"Chunking failed: {e}")
ResourceNotFoundError
Requested resource doesn't exist.
Common causes:
- •Document ID doesn't exist
- •Chunk ID not found
- •Owner has no documents
When raised:
- •During
delete_document()if document not found - •During
get_document()if document doesn't exist
DocumentNotFoundError
Specific document doesn't exist (subclass of ResourceNotFoundError).
When raised:
- •During
get_document()if document_id doesn't exist - •During
delete_document()if document not found
Example:
from llmemory import DocumentNotFoundError
from uuid import UUID
try:
doc = await memory.get_document(
owner_id="workspace-1",
document_id=UUID("00000000-0000-0000-0000-000000000000")
)
except DocumentNotFoundError as e:
print(f"Document not found: {e}")
RateLimitError
API rate limit exceeded.
Common causes:
- •OpenAI API rate limit hit
- •Too many embedding requests in short time
- •Exceeded configured rate limits
When raised:
- •During embedding generation if API rate limited
- •During query expansion if LLM API rate limited
Example:
from llmemory import RateLimitError
import asyncio
try:
# Batch process with rate limiting
for doc in documents:
await memory.add_document(...)
except RateLimitError as e:
print(f"Rate limited: {e}")
await asyncio.sleep(60) # Wait before retry
PermissionError
Permission denied for operation.
Common causes:
- •Attempting to access document owned by different owner_id
- •Database permission denied
When raised:
- •During operations if user doesn't have permission
- •During delete if document belongs to different owner
Example:
from llmemory import PermissionError as LLMemoryPermissionError
try:
# Trying to access another owner's document
doc = await memory.get_document(owner_id="workspace-1", document_id="...")
except LLMemoryPermissionError as e:
print(f"Permission denied: {e}")
Error Handling Patterns
Basic Error Handling
from llmemory import (
LLMemoryError, ConfigurationError, ValidationError, DatabaseError,
DocumentNotFoundError, EmbeddingError, SearchError, ChunkingError,
ResourceNotFoundError, RateLimitError, ConnectionError
)
try:
memory = LLMemory(connection_string="postgresql://localhost/mydb")
await memory.initialize()
result = await memory.add_document(
owner_id="workspace-1",
id_at_origin="user-123",
document_name="test.txt",
document_type=DocumentType.TEXT,
content="Test content"
)
results = await memory.search(
owner_id="workspace-1",
query_text="test query"
)
except ConfigurationError as e:
print(f"Configuration error: {e}")
except ValidationError as e:
print(f"Validation error: {e}")
except ConnectionError as e:
print(f"Cannot connect to database: {e}")
except DatabaseError as e:
print(f"Database error: {e}")
except DocumentNotFoundError as e:
print(f"Document not found: {e}")
except EmbeddingError as e:
print(f"Embedding error: {e}")
except SearchError as e:
print(f"Search error: {e}")
except ChunkingError as e:
print(f"Chunking error: {e}")
except RateLimitError as e:
print(f"Rate limit hit: {e}")
await asyncio.sleep(60) # Wait before retry
except LLMemoryError as e:
print(f"Unexpected llmemory error: {e}")
finally:
await memory.close()
Granular Error Handling
# Handle specific errors differently
try:
result = await memory.add_document(...)
except ValidationError as e:
# User input error - return 400
return {"error": str(e), "code": 400}
except EmbeddingError as e:
# Embedding failed but document added - return partial success
logger.error(f"Embedding failed: {e}")
return {"warning": "Document added but embeddings pending", "code": 202}
except DatabaseError as e:
# System error - return 500
logger.error(f"Database error: {e}")
return {"error": "Internal server error", "code": 500}
Retry Logic for Transient Errors
import asyncio
from llmemory import RateLimitError, ConnectionError
async def robust_search(memory, owner_id, query, max_retries=3):
"""Search with retry logic for transient errors."""
for attempt in range(max_retries):
try:
return await memory.search(
owner_id=owner_id,
query_text=query
)
except RateLimitError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
raise
except ConnectionError:
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
raise
Complete Environment Variable Reference
Database Configuration
DATABASE_URL=postgresql://localhost/mydb # PostgreSQL connection string LLMEMORY_DB_MIN_POOL_SIZE=5 # Minimum connection pool size (default: 5) LLMEMORY_DB_MAX_POOL_SIZE=20 # Maximum connection pool size (default: 20)
Embedding Configuration
# Provider selection OPENAI_API_KEY=sk-... # OpenAI API key (required for OpenAI embeddings) LLMEMORY_EMBEDDING_PROVIDER=openai # Provider: "openai" or "local-minilm" (default: "openai") # Local embedding models LLMEMORY_LOCAL_MODEL=all-MiniLM-L6-v2 # Local model name (default: all-MiniLM-L6-v2) LLMEMORY_LOCAL_DEVICE=cpu # Device: "cpu" or "cuda" (default: cpu) LLMEMORY_LOCAL_CACHE_DIR=/path/to/cache # Cache directory for local models
Search Configuration
# HNSW Index tuning LLMEMORY_HNSW_PROFILE=balanced # Profile: "fast", "balanced", "accurate" (default: balanced) # Search defaults LLMEMORY_DEFAULT_SEARCH_TYPE=hybrid # Default search type (default: hybrid) LLMEMORY_SEARCH_CACHE_TTL=300 # Search cache TTL in seconds (default: 300)
Query Expansion Configuration
LLMEMORY_ENABLE_QUERY_EXPANSION=1 # Enable query expansion: 1 or 0 (default: 0) LLMEMORY_MAX_QUERY_VARIANTS=3 # Max query variants to generate (default: 3)
Reranking Configuration
LLMEMORY_ENABLE_RERANK=1 # Enable reranking: 1 or 0 (default: 0) LLMEMORY_RERANK_PROVIDER=openai # Provider: "openai", "lexical" (default: lexical) LLMEMORY_RERANK_MODEL=gpt-4.1-mini # Reranking model name LLMEMORY_RERANK_TOP_K=50 # Candidates to consider (default: 50) LLMEMORY_RERANK_RETURN_K=15 # Results to return after reranking (default: 15) LLMEMORY_RERANK_DEVICE=cpu # Device for local rerankers: "cpu" or "cuda" LLMEMORY_RERANK_BATCH_SIZE=16 # Batch size for local reranking (default: 16)
Chunking Configuration
LLMEMORY_ENABLE_CHUNK_SUMMARIES=1 # Enable chunk summaries: 1 or 0 (default: 0)
Feature Flags
LLMEMORY_DISABLE_CACHING=1 # Disable search caching (default: enabled) LLMEMORY_DISABLE_METRICS=1 # Disable Prometheus metrics (default: enabled)
Logging
LLMEMORY_LOG_LEVEL=INFO # Log level: DEBUG, INFO, WARNING, ERROR (default: INFO)
Complete Configuration Reference
LLMemoryConfig
Main configuration class containing all subsystem configurations.
Constructor:
LLMemoryConfig(
embedding: EmbeddingConfig = EmbeddingConfig(),
chunking: ChunkingConfig = ChunkingConfig(),
search: SearchConfig = SearchConfig(),
database: DatabaseConfig = DatabaseConfig(),
validation: ValidationConfig = ValidationConfig(),
enable_caching: bool = True,
enable_metrics: bool = True,
enable_background_processing: bool = True,
log_level: str = "INFO",
log_slow_queries: bool = True,
slow_query_threshold: float = 1.0
)
Creating and using config:
from llmemory import LLMemoryConfig
# Use default configuration
config = LLMemoryConfig()
# Modify specific settings
config.embedding.default_provider = "openai"
config.chunking.default_parent_size = 1000
config.search.enable_query_expansion = True
# Use with LLMemory
memory = LLMemory(
connection_string="postgresql://localhost/mydb",
config=config
)
Loading from environment:
# Automatically reads from environment variables config = LLMemoryConfig.from_env() memory = LLMemory(connection_string="...", config=config)
EmbeddingConfig
Configuration for embedding generation.
Fields:
- •
default_provider(str, default: "openai"): Default embedding provider - •
providers(Dict[str, EmbeddingProviderConfig]): Available providers - •
auto_create_tables(bool, default: True): Auto-create provider tables
Example:
config = LLMemoryConfig() config.embedding.default_provider = "local-minilm"
EmbeddingProviderConfig
Configuration for a single embedding provider.
Fields:
- •
provider_type(str): "openai" or "local" - •
model_name(str): Model name - •
dimension(int): Embedding dimensions - •
api_key(Optional[str]): API key (for OpenAI) - •
device(str, default: "cpu"): Device for local models ("cpu" or "cuda") - •
cache_dir(Optional[str]): Cache directory for local models - •
batch_size(int, default: 100): Batch size for processing - •
max_retries(int, default: 3): Max retries on failure - •
retry_delay(float, default: 1.0): Delay between retries in seconds - •
timeout(float, default: 30.0): Request timeout in seconds - •
max_tokens_per_minute(int, default: 1,000,000): Rate limit for tokens - •
max_requests_per_minute(int, default: 3,000): Rate limit for requests
ChunkingConfig
Configuration for document chunking (in config.py).
Fields:
- •
enable_chunk_summaries(bool, default: False): Generate summaries for chunks - •
summary_max_tokens(int, default: 120): Max tokens for summaries - •
min_chunk_size(int, default: 50): Minimum chunk size in tokens - •
max_chunk_size(int, default: 2000): Maximum chunk size in tokens - •
enable_contextual_retrieval(bool, default: False): Prepend document context to chunks before embedding (Anthropic's approach) - •
context_template(str): Template for contextual retrieval format (default: "Document: {document_name}\nType: {document_type}\n\n{content}")
Contextual Retrieval Example:
config = LLMemoryConfig()
config.chunking.enable_contextual_retrieval = True
memory = LLMemory(connection_string="...", config=config)
# Chunks are embedded with document context prepended:
# "Document: Q3 Report\nType: report\n\nRevenue increased 15%"
#
# But chunk.content remains original for display:
# "Revenue increased 15%"
await memory.add_document(
owner_id="workspace-1",
id_at_origin="kb",
document_name="Q3 Report",
document_type=DocumentType.REPORT,
content="Revenue increased 15% QoQ..."
)
Example:
config = LLMemoryConfig() config.chunking.enable_chunk_summaries = True config.chunking.summary_max_tokens = 100
SearchConfig
Configuration for search operations.
Fields:
- •
default_limit(int, default: 10): Default result limit - •
max_limit(int, default: 100): Maximum allowed limit - •
default_search_type(str, default: "hybrid"): Default search type - •
hnsw_profile(str, default: "balanced"): HNSW index profile - •
rrf_k(int, default: 50): RRF constant for fusion - •
enable_query_expansion(bool, default: False): Enable query expansion - •
max_query_variants(int, default: 3): Max query variants - •
query_expansion_model(Optional[str]): Model for expansion - •
include_keyword_variant(bool, default: True): Include keyword variant - •
enable_rerank(bool, default: False): Enable reranking - •
default_rerank_model(Optional[str]): Reranking model - •
rerank_provider(str, default: "lexical"): Reranker provider - •
rerank_top_k(int, default: 50): Candidates for reranking - •
rerank_return_k(int, default: 15): Results after reranking - •
rerank_device(Optional[str]): Device for local rerankers - •
rerank_batch_size(int, default: 16): Batch size for reranking - •
hnsw_ef_search(int, default: 100): HNSW ef_search parameter - •
vector_search_limit(int, default: 100): Internal vector search limit - •
text_search_limit(int, default: 100): Internal text search limit - •
cache_ttl(int, default: 3600): Cache TTL in seconds - •
cache_max_size(int, default: 10000): Max cache entries - •
search_timeout(float, default: 5.0): Search timeout in seconds - •
min_score_threshold(float, default: 0.0): Minimum score threshold
Example:
config = LLMemoryConfig() config.search.enable_query_expansion = True config.search.enable_rerank = True config.search.rerank_provider = "openai" config.search.hnsw_profile = "accurate"
DatabaseConfig
Configuration for database operations.
Fields:
- •
min_pool_size(int, default: 5): Minimum connection pool size - •
max_pool_size(int, default: 20): Maximum connection pool size - •
connection_timeout(float, default: 10.0): Connection timeout in seconds - •
command_timeout(float, default: 30.0): Command timeout in seconds - •
schema_name(str, default: "llmemory"): PostgreSQL schema name - •
documents_table(str, default: "documents"): Documents table name - •
chunks_table(str, default: "document_chunks"): Chunks table name - •
embeddings_queue_table(str, default: "embedding_queue"): Queue table name - •
search_history_table(str, default: "search_history"): Search history table - •
embedding_providers_table(str, default: "embedding_providers"): Providers table - •
chunk_embeddings_prefix(str, default: "chunk_embeddings_"): Embedding table prefix - •
hnsw_index_name(str, default: "document_chunks_embedding_hnsw"): HNSW index name - •
hnsw_m(int, default: 16): HNSW M parameter - •
hnsw_ef_construction(int, default: 200): HNSW ef_construction parameter
Example:
config = LLMemoryConfig() config.database.schema_name = "my_app_llmemory" config.database.min_pool_size = 10 config.database.max_pool_size = 50
ValidationConfig
Configuration for input validation.
Fields:
- •
max_owner_id_length(int, default: 255): Max owner_id length - •
max_id_at_origin_length(int, default: 255): Max id_at_origin length - •
max_document_name_length(int, default: 500): Max document name length - •
max_content_length(int, default: 10,000,000): Max content length (10MB) - •
max_metadata_size(int, default: 65536): Max metadata size (64KB) - •
min_content_length(int, default: 10): Minimum content length - •
valid_owner_id_pattern(str): Regex for valid owner_id - •
valid_id_at_origin_pattern(str): Regex for valid id_at_origin
Example:
config = LLMemoryConfig() config.validation.max_content_length = 20_000_000 # 20MB config.validation.min_content_length = 50 # Require at least 50 chars
Common Mistakes
❌ Wrong: Not calling initialize()
memory = LLMemory(connection_string="...") results = await memory.search(...) # Error: not initialized
✅ Right: Always call initialize()
memory = LLMemory(connection_string="...") await memory.initialize() # Required! results = await memory.search(...)
❌ Wrong: Not closing connections
memory = LLMemory(connection_string="...") await memory.initialize() # ... use memory ... # Missing: await memory.close()
✅ Right: Use context manager
async with LLMemory(connection_string="...") as memory:
# ... use memory ...
# Automatically closed
❌ Wrong: Forgetting owner_id filtering
results = await memory.search(
owner_id="workspace-1",
query_text="sensitive data"
)
# Results only from workspace-1 (good!)
# But need to verify owner_id matches current user
✅ Right: Always validate owner_id
current_workspace = get_current_workspace()
results = await memory.search(
owner_id=current_workspace, # Validated owner
query_text="sensitive data"
)
Related Skills
- •
hybrid-search- Vector + BM25 hybrid search patterns - •
multi-query- Query expansion and multi-query retrieval - •
multi-tenant- Multi-tenant isolation patterns for SaaS - •
rag- Building complete RAG systems with reranking
Important Notes
Multi-Tenancy:
Always provide owner_id for proper data isolation. llmemory automatically filters all operations by owner.
Connection Pooling:
For production applications with multiple services, use from_db_manager() with a shared connection pool (see pgdbm-shared-pool skill).
Chunking:
Documents are automatically chunked during add_document(). Default strategy is hierarchical chunking which creates parent and child chunks for better retrieval.
Embeddings:
Embeddings are generated automatically unless generate_embeddings=False. For batch operations, consider using background processing.
Search Types:
- •
VECTOR: Best for semantic similarity - •
TEXT: Best for exact keyword matching - •
HYBRID: Best for most use cases (combines both)