kkrpc Language Interop
Implement kkrpc client/server in any programming language to communicate with TypeScript kkrpc endpoints.
Overview
kkrpc is a TypeScript-first bidirectional RPC library. This skill teaches you how to implement language interop clients/servers in any programming language to communicate with kkrpc TypeScript endpoints.
Supported Reference Implementations
| Language | Location | Transports |
|---|---|---|
| Go | interop/go/kkrpc/ | stdio, WebSocket |
| Python | interop/python/kkrpc/ | stdio, WebSocket |
| Rust | interop/rust/src/ | stdio, WebSocket |
| Swift | interop/swift/Sources/kkrpc/ | stdio, WebSocket |
Core Protocol
Message Format (JSON-only for interop)
All messages are line-delimited JSON (newline-terminated UTF-8 strings).
{
"id": "uuid-string",
"type": "request|response|callback|get|set|construct",
"version": "json",
"method": "optional.method.path",
"args": [...],
"path": ["optional", "property", "path"],
"value": "optional-value-for-set",
"callbackIds": ["optional-callback-ids"]
}
Message Types
| Type | Purpose | Required Fields |
|---|---|---|
request | Remote method call | id, method, args |
response | Return value or error | id, args.result or args.error |
callback | Invoke callback function | method (callback id), args |
get | Property read | id, path |
set | Property write | id, path, value |
construct | Constructor call | id, method, args |
Request Message Example
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"type": "request",
"version": "json",
"method": "math.add",
"args": [1, 2],
"callbackIds": []
}
Response Message Example (Success)
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"type": "response",
"version": "json",
"method": "",
"args": {
"result": 3
}
}
Response Message Example (Error)
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"type": "response",
"version": "json",
"method": "",
"args": {
"error": {
"name": "Error",
"message": "Division by zero"
}
}
}
Callback Encoding
Functions are encoded as string markers: __callback__<uuid>
When sending a callback:
- •Generate a UUID for the callback
- •Store the callable with that ID
- •Send
"__callback__<uuid>"as the argument - •Include the callback ID in
callbackIdsarray
When receiving a callback marker:
- •Create a wrapper function that sends a
callbackmessage - •The callback message uses the ID as the
methodfield - •Invoke your stored callable when
type: "callback"arrives
UUID Generation
Format: 4 hex parts joined with - (e.g., a1b2c3d4-e5f6-7890-abcd-ef1234567890)
Go
func GenerateUUID() string {
parts := make([]string, 0, 4)
for i := 0; i < 4; i++ {
parts = append(parts, fmt.Sprintf("%x", rand.Int63()))
}
return fmt.Sprintf("%s-%s-%s-%s", parts[0], parts[1], parts[2], parts[3])
}
Python
def generate_uuid() -> str:
return "-".join(f"{random.getrandbits(53):x}" for _ in range(4))
Rust
pub fn generate_uuid() -> String {
let mut rng = rand::thread_rng();
let parts: Vec<String> = (0..4)
.map(|_| format!("{:x}", rng.gen::<u64>()))
.collect();
parts.join("-")
}
Swift
public func generateUUID() -> String {
let parts = (0..<4).map { _ in
String(format: "%llx", UInt64.random(in: 0..<UInt64.max))
}
return parts.joined(separator: "-")
}
Transport Layer
Transport Interface
Every transport must implement:
read() -> string? // Read one line/message, null/None if closed write(message) // Write message (with newline for stdio) close() // Close connection
Stdio Transport
- •Read lines from input stream (blocking)
- •Write messages to output stream with
\nsuffix - •Flush immediately after writing
WebSocket Transport
- •Send/receive text frames containing JSON
- •Handle connection handshake (RFC6455)
- •Mask client-to-server frames
- •Read frames and extract payload
Language-Specific Transport Patterns
Go (Interface):
type Transport interface {
Read() (string, error)
Write(message string) error
Close() error
}
Python (ABC):
class Transport(ABC):
@abstractmethod
def read(self) -> Optional[str]: ...
@abstractmethod
def write(self, message: str) -> None: ...
@abstractmethod
def close(self) -> None: ...
Rust (Trait):
pub trait Transport: Send + Sync {
fn read(&self) -> Option<String>;
fn write(&self, message: &str) -> Result<(), String>;
fn close(&self);
}
Swift (Protocol):
public protocol Transport {
func read() async throws -> String?
func write(_ message: String) async throws
func close() async
}
Client Implementation
Client Responsibilities
- •Request Management: Track pending requests by ID
- •Callback Storage: Store callbacks by ID
- •Response Handling: Route responses to waiting callers
- •Read Loop: Continuously read messages in background
Client Methods
| Method | Purpose |
|---|---|
call(method, ...args) | Invoke remote method |
get(path[]) | Read remote property |
set(path[], value) | Write remote property |
construct(method, ...args) | Call constructor |
Client Request Flow
1. Generate request ID 2. Create pending request entry (ID -> Promise/Channel) 3. Process arguments (encode callbacks to __callback__<id>) 4. Build message payload 5. Serialize to JSON + newline 6. Write to transport 7. Block/wait for response 8. Return result or throw error
Callback Encoding Flow
For each argument:
If callable:
- Generate callback ID
- Store callable with ID
- Replace with "__callback__<id>"
- Add ID to callbackIds array
Else:
- Pass through as-is
Response Handling
On receive message:
If type == "response":
- Look up pending request by ID
- If args has "error", reject/throw
- Else resolve with args["result"]
- Remove from pending
If type == "callback":
- Get callback ID from "method" field
- Look up stored callback
- Invoke with "args" array
Server Implementation
Server Responsibilities
- •API Registration: Store method handlers by path
- •Request Dispatch: Route requests to handlers
- •Path Resolution: Resolve dot-notation paths (e.g., "math.add")
- •Callback Wrapping: Convert callback markers to callable wrappers
- •Response Sending: Send results or errors back
Message Handlers
| Type | Handler Logic |
|---|---|
request | Resolve path, get handler, wrap callbacks, invoke, send response |
get | Resolve path, return value at path |
set | Resolve parent path, set property, return true |
construct | Like request, but for constructors |
Path Resolution
Split method by . and traverse API object:
api = {
"math": {
"add": (a, b) => a + b
}
}
Path: ["math", "add"] -> resolves to function
Callback Wrapping
When receiving __callback__<id> in arguments:
Create wrapper function that:
- Sends message with:
- type: "callback"
- method: <callback-id>
- args: wrapper arguments
- id: original request ID
- Writes to transport
Error Response Format
{
"id": "request-id",
"type": "response",
"version": "json",
"method": "",
"args": {
"error": {
"name": "ErrorClassName",
"message": "Error description"
}
}
}
Implementation Patterns by Language
Go Patterns
// Client struct
type Client struct {
transport Transport
pending map[string]chan responsePayload
callbacks map[string]Callback
mu sync.Mutex
}
// Handler signature
api.Register("math.add", func(args []any) any {
return args[0].(float64) + args[1].(float64)
})
// Response channel pattern
responseCh := make(chan responsePayload, 1)
pending[requestID] = responseCh
// ... send request ...
response := <-responseCh
Python Patterns
# Threading-based client
class RpcClient:
def __init__(self, transport: Transport):
self._transport = transport
self._pending: Dict[str, PendingRequest] = {}
self._callbacks: Dict[str, Callable] = {}
self._lock = threading.Lock()
self._reader_thread = threading.Thread(target=self._read_loop, daemon=True)
self._reader_thread.start()
# Queue-based response waiting
pending = PendingRequest(queue=queue.Queue(maxsize=1))
self._pending[request_id] = pending
# ... send request ...
response = pending.queue.get()
Rust Patterns
// Arc + Mutex for shared state
pub struct Client {
transport: Arc<dyn Transport>,
pending: Arc<Mutex<HashMap<String, Sender<ResponsePayload>>>>,
callbacks: Arc<Mutex<HashMap<String, Callback>>>,
}
// Arg enum for value vs callback
pub enum Arg {
Value(Value),
Callback(Callback),
}
// mpsc channel for responses
let (sender, receiver) = mpsc::channel();
pending.lock().unwrap().insert(request_id, sender);
let response = receiver.recv().expect("response");
Swift Patterns
// Actor for thread-safe state
public actor Client {
private var pending: [String: CheckedContinuation<ResponsePayload, Never>] = [:]
private var callbacks: [String: Callback] = [:]
}
// Async/await with continuation
return await withCheckedContinuation { continuation in
pending[requestId] = continuation
}
// Handler typealias
public typealias Handler = ([Any]) -> Any
public typealias Callback = ([Any]) -> Void
TypeScript Compatibility
Required kkrpc Settings
When creating RPCChannel on TypeScript side for interop:
const rpc = new RPCChannel(io, {
expose: api,
serialization: { version: "json" } // REQUIRED for interop
})
SuperJSON Note
kkrpc defaults to SuperJSON which supports Date, Map, Set, BigInt, Uint8Array. Interop implementations only support JSON - complex types will not work.
Supported Types
| Type | JSON Support |
|---|---|
| Number | ✓ (float64) |
| String | ✓ |
| Boolean | ✓ |
| null | ✓ |
| Array | ✓ |
| Object | ✓ |
| Date | ✗ (use ISO string) |
| BigInt | ✗ (use string) |
| Uint8Array | ✗ (use base64) |
| Map/Set | ✗ (convert to object/array) |
Testing Strategy
Test Against Reference Server
Use the Node.js test server: interop/node/server.ts
# Terminal 1: Start server bun interop/node/server.ts # Terminal 2: Run your client your-client-app
API to Test Against
const api = {
math: {
add(a: number, b: number): number
},
echo<T>(value: T): T,
withCallback(value: string, cb: (payload: string) => void): string,
counter: number,
settings: {
theme: string,
notifications: { enabled: boolean }
}
}
Test Cases
- •Basic call:
math.add(1, 2)→3 - •Echo:
echo({"hello": "world"})→ same object - •Callback:
withCallback("test", cb)→ cb invoked with "callback:test" - •Property get:
await api.counter→42 - •Property get nested:
await api.settings.theme→"light" - •Property set:
api.counter = 100→ success - •Error handling: Call non-existent method → throws error
Common Pitfalls
1. Missing Newlines
Always terminate JSON messages with \n for stdio transport.
2. JSON Number Precision
JSON numbers are float64. Large integers may lose precision.
3. Callback Memory Leaks
Clean up callback entries after they're invoked or when connection closes.
4. Path Resolution
Method names use dot-notation: "math.add" → api["math"]["add"]
5. Thread Safety
Multiple concurrent requests require proper synchronization:
- •Go: sync.Mutex
- •Python: threading.Lock
- •Rust: Arc<Mutex<>>
- •Swift: Actor
6. Error Propagation
Always include both name and message in error responses.
Step-by-Step Implementation Guide
Phase 1: Protocol Layer
- •Implement UUID generator (4 hex parts)
- •Implement JSON encode/decode with newline handling
- •Define error types
Phase 2: Transport Layer
- •Define Transport interface/trait/ABC
- •Implement StdioTransport
- •(Optional) Implement WebSocketTransport
Phase 3: Client
- •Create Client struct/class
- •Implement request ID tracking (pending map)
- •Implement callback storage
- •Implement read loop (background thread/task)
- •Implement
call()method - •Implement response routing
- •Implement callback invocation
Phase 4: Server
- •Create Server struct/class
- •Implement API registration
- •Implement read loop
- •Implement message dispatch (switch on type)
- •Implement path resolution
- •Implement callback wrapping
- •Implement request handler
- •Implement get/set handlers
Phase 5: Testing
- •Test against
interop/node/server.ts - •Test callbacks
- •Test property access
- •Test error handling
- •Test concurrent requests
Example: Minimal Implementation Template
# Minimal Python implementation template
import json
import random
import threading
import queue
from typing import Dict, Any, Optional, Callable, List
CALLBACK_PREFIX = "__callback__"
def generate_uuid() -> str:
return "-".join(f"{random.getrandbits(53):x}" for _ in range(4))
def encode_message(payload: Dict[str, Any]) -> str:
return json.dumps(payload, ensure_ascii=False) + "\n"
def decode_message(message: str) -> Dict[str, Any]:
return json.loads(message)
class Transport:
def read(self) -> Optional[str]: raise NotImplementedError
def write(self, message: str) -> None: raise NotImplementedError
def close(self) -> None: raise NotImplementedError
class RpcClient:
def __init__(self, transport: Transport):
self._transport = transport
self._pending: Dict[str, queue.Queue] = {}
self._callbacks: Dict[str, Callable] = {}
self._lock = threading.Lock()
threading.Thread(target=self._read_loop, daemon=True).start()
def call(self, method: str, *args: Any) -> Any:
request_id = generate_uuid()
response_queue = queue.Queue(maxsize=1)
with self._lock:
self._pending[request_id] = response_queue
# Process callbacks in args
processed_args = []
callback_ids = []
for arg in args:
if callable(arg):
cb_id = generate_uuid()
self._callbacks[cb_id] = arg
callback_ids.append(cb_id)
processed_args.append(f"{CALLBACK_PREFIX}{cb_id}")
else:
processed_args.append(arg)
payload = {
"id": request_id,
"type": "request",
"version": "json",
"method": method,
"args": processed_args,
}
if callback_ids:
payload["callbackIds"] = callback_ids
self._transport.write(encode_message(payload))
return response_queue.get()
def _read_loop(self):
while True:
line = self._transport.read()
if line is None:
break
message = decode_message(line.strip())
msg_type = message.get("type")
if msg_type == "response":
request_id = message.get("id")
with self._lock:
q = self._pending.pop(request_id, None)
if q:
q.put(message.get("args", {}).get("result"))
elif msg_type == "callback":
cb_id = message.get("method")
cb = self._callbacks.get(cb_id)
if cb:
cb(*message.get("args", []))
References
- •Go implementation:
interop/go/kkrpc/ - •Python implementation:
interop/python/kkrpc/ - •Rust implementation:
interop/rust/src/lib.rs - •Swift implementation:
interop/swift/Sources/kkrpc/ - •Protocol spec:
interop/README.md - •TypeScript core:
packages/kkrpc/src/serialization.ts