Interpolated ??
- Anand Nerurkar
- Nov 25
- 18 min read
Interpolated inputs in prompts refer to dynamic variables that are “injected” into a prompt at runtime instead of being hard-coded.They allow you to build prompt templates where parameters change based on context, user input, or system data.
✅ What Are Interpolated Inputs?
It's like having placeholders inside your prompt that get filled when the model is executed.
Example (simple)
Template prompt:
You are analyzing a loan application. Applicant name: {{name}},
Credit score: {{credit_score}},
Loan amount: {{amount}}.
Provide risk assessment.
Runtime inputs get interpolated:
name = "Ravi"
credit_score = 715
amount = 12,00,000
Interpolated final prompt becomes:
You are analyzing a loan application. Applicant name: Ravi,
Credit score: 715,
Loan amount: 12,00,000.
Provide risk assessment.
✅ Why Use Interpolated Inputs?
1. Reuse same prompt for many scenarios
You create one template → fill values dynamically.
2. Prevent hard-coding
Useful in applications, pipelines, agents, workflows.
3. Works with LLM Orchestration tools
LangChain
LlamaIndex
Azure AI Foundry
AWS Bedrock Agents
OpenAI Assistants API
🔥 Examples of Interpolation
1. With OpenAI Prompt Template
prompt = """
Generate loan eligibility.
Name: {name}
Age: {age}
Income: {income}
"""
final_prompt = prompt.format(
name="Anand",
age=35,
income=2400000
)
2. With LangChain
from langchain.prompts import PromptTemplate
template = """
Summarize customer complaint.
Customer ID: {cust_id}
Complaint: {complaint}
"""
prompt = PromptTemplate.from_template(template)
final_prompt = prompt.format(
cust_id="C1021",
complaint="ATM cash not dispensed but amount deducted."
)
3. With Azure AI Foundry Prompt Flow
YAML prompt:
input_variables:
- name: policy
- name: loan_amount
template: |
You are a loan policy expert.
Policy: {{policy}}
Loan amount: {{loan_amount}}
Give approval decision.
The {{policy}} and {{loan_amount}} get injected dynamically.
🔥 Real Banking Use Case Example (GenAI)
Prompt Template
You are an intelligent banking assistant.
KYC extracted fields:
Name: {{name}}
PAN: {{pan}}
Income: {{income}}
Loan Product: {{product}}
Lending Policy Version: {{policy_version}}
Use the above values and respond with:
- Decision
- Reason
- Missing Information
Dynamic Inputs from OCR + ML Pipeline:
{
"name": "Anand",
"pan": "ABCDE1234G",
"income": 2200000,
"product": "HomeLoan",
"policy_version": "v2.1"
}
Prompt becomes personalized for each customer.
🧠 Where Interpolated Inputs Are Critical?
✓ LLM Agents
Agent gets facts → injects into system prompt.
✓ RAG / Vector Search
Chunks retrieved → added as interpolated context:
Relevant policy: {{retrieved_context}}
User query: {{query}}
✓ MLOps + LLMOps pipelines
Data from ML model → injected into LLM prompt.
✓ Back Office Automation
Ticket ID, transaction data, logs → interpolated.
📌 Summary
Term | Meaning |
Interpolated Inputs | Dynamic variables injected into prompts |
Purpose | Re-use templates, keep prompts clean, adapt to context |
Used in | RAG, Agents, Automation, MLOps, LLMOps |
Syntax | {variable}, {{variable}}, $variable based on framework |
How to Confirm tool responses are valid and not causing agent to requeue the same task.
=====
Below is a clear, production-grade method to confirm tool responses are valid and prevent an agent from re-queuing the same task.This applies to OpenAI Agents, LangChain Agents, Azure Agents, and any autonomous workflow with tool calls.
✅ Problem
LLM agents sometimes:
Get invalid tool responses (empty, malformed, missing fields)
Re-queue the same tool request repeatedly
Fall into “looping behaviour”
✅ Solution Overview
You must implement three layers of safety:
Tool Response Schema Validation
Idempotency + Task Completion Checks
Agent Memory / Context Guardrails (Loop Prevention)
🟦 1. Tool Response Schema Validation (Hard Guarantee)
Every tool must return strict JSON with validated fields.
Example Schema
{
"status": "success | failure",
"task_id": "string",
"data": {},
"error_message": "string or null"
}
Agent Instructions
Inside your system prompt / agent definition, enforce:
All tool results MUST include:
- status
- task_id
- data or error_message
If results do not follow schema, ask the tool to resend the response.
NEVER requeue the same task unless the tool returns a 'retry' flag.
Backend Check
Your tool implementation must:
Validate outgoing fields
Reject malformed responses before sending to LLM
🟦 2. Idempotency Keys + Task Completion Markers
To avoid infinite loops, the agent must NEVER re-run the same tool call unless explicitly required.
Implement:
A. Idempotency Key
Each task gets a consistent ID:
task_id = hash(user_input + context)
B. Task Status Table
task_id | status | last_updated |
xyz123 | completed | 15:30 |
C. Agent Enforcement
Add instruction:
Before calling a tool, check if this task_id was already executed.
If yes, do not call the tool again. Instead use the existing result.
🟦 3. Loop Prevention Rules for the Agent
In your system prompt:
You must never call the same tool with the same arguments more than once.
If the tool returns an invalid result, do NOT retry automatically.
Ask for human clarification if the same failure happens twice.
Retry Strategy
Attempt | Action |
1 | Retry tool once with a clearer message |
2 | Ask human for clarification |
3 | Abort safely |
🟩 Full Example (Production)
System Prompt for Agent
When using tools:
1. Each tool call must include a unique task_id.
2. Before calling a tool, check if the task_id has a known status of "completed".
If completed, do NOT call the tool again.
3. Validate tool responses:
- Must include: status, task_id, data, error_message
- If the response is invalid, ask the tool for a corrected response ONCE.
4. Never attempt the same tool with identical arguments more than once.
5. If the tool fails twice, escalate to the user.
🟦 Tool Implementation Pattern
Tool returns:
{
"status": "success",
"task_id": "loan_eval_1021",
"data": {
"decision": "approved",
"score": 91
},
"error_message": null
}
Agent validates before continuing:
schema correct?
task completed?
data is usable?
If not → agent requests corrected output, NOT tool re-execution.
🟥 The Biggest Mistakes (You Must Avoid)
❌ Agent retries same tool call with same args → infinite loop❌ Tools returning inconsistent JSON❌ No schema enforcement❌ No idempotency → multiple calls for same job❌ Agent tries to “fix” tool output by calling tool again
Below are all 3 items you requested — complete, production-ready, and tailored for GenAI + Banking + Automation + AI Engineering Teams (your context).
✅ 1. Loop Prevention Guardrail Block (System Prompt)
Copy–paste directly into your Agent system prompt.This prevents re-queues, duplicate tool calls, invalid responses, and agent loops.
🔒 Loop Prevention & Tool Validation Guardrail Block
TOOL USAGE RULES — STRICT POLICY
1. EVERY tool call MUST contain:
- task_id (string, unique for the given input)
- parameters (arguments passed to tool)
2. BEFORE calling a tool, you MUST check:
- Has this exact task_id been executed already?
- Has this tool already been invoked with identical arguments?
If YES → DO NOT call the tool again.
Instead, reuse the prior tool response.
3. TOOL RESPONSE VALIDATION:
Every tool response must contain the following fields:
- status: "success" | "failure"
- task_id: string
- data: JSON object (mandatory for success)
- error_message: string or null (mandatory for failure)
4. If the tool response is malformed or missing fields:
- Ask the tool ONCE for a corrected response.
- Do NOT requeue the same tool task with the same arguments.
5. RETRY POLICY:
- Only ONE retry allowed for malformed tool output.
- If the tool fails twice, escalate to the human user.
- Never loop or call the same tool again automatically.
6. IDEMPOTENCY:
- Never call a tool more than once with the same (task_id + arguments).
- Treat all tool operations as idempotent unless the tool explicitly returns:
{ "status": "failure", "retry": true }
7. SAFETY:
- If conflict or ambiguity exists, STOP and ask for clarification.
- Never hallucinate a tool response if a tool fails.
You MUST strictly follow these rules to prevent infinite loops, duplicate work, or unsafe tool execution.
✅ 2. Production Tool Response Schema Templates
These schemas must be returned by every tool your agent uses(ML tool, retrieval tool, vector DB tool, OCR tool, KYC tool, transaction tool, etc).
🟦 Standard Banking/GenAI Tool Response (Success)
{
"status": "success",
"task_id": "loan_eval_20251124_001",
"data": {
"decision": "approved",
"approval_score": 0.92,
"policy_version": "v2.1"
},
"error_message": null
}
🟥 Standard Failure Schema (Non-retryable)
{
"status": "failure",
"task_id": "loan_eval_20251124_001",
"data": {},
"error_message": "Missing income field in request"
}
🟧 Retryable Error Schema (Retry Allowed Once)
Use only when tool logic says retry is safe.
{
"status": "failure",
"task_id": "loan_eval_20251124_001",
"data": {},
"error_message": "Temporary DB connection issue",
"retry": true
}
🟩 Validation Rules to Enforce
You must reject responses if:
❌ “status” missing❌ “task_id” missing❌ “data” missing on success❌ “error_message” missing on failure❌ Unexpected fields❌ Output is not valid JSON
The agent must say:
“Tool response invalid — resend using required schema.”
✅ 3. Full Agent Code Examples (Idempotent + Loop-Safe)
Below are production-ready patterns for:
OpenAI Assistants API
LangChain Agents
Azure AI Foundry Agents
I’ll give all three.
🟦 A. OpenAI Assistants API Example (Production Template)
import hashlib
import json
from openai import OpenAI
client = OpenAI()
# ---- Idempotency store ----
completed_tasks = {} # task_id -> response
def get_task_id(input_data):
return hashlib.sha256(json.dumps(input_data, sort_keys=True).encode()).hexdigest()
def safe_tool_call(tool_name, arguments, task_id):
# Check if already completed
if task_id in completed_tasks:
return completed_tasks[task_id]
# Make the tool call
response = call_tool_backend(tool_name, arguments)
# Validate response schema
if not validate_tool_schema(response):
raise ValueError("Invalid tool response schema")
# Save result before returning
completed_tasks[task_id] = response
return response
assistant = client.beta.assistants.create(
name="BankingAutomationAgent",
model="gpt-5.1",
instructions=open("guardrail_block.txt").read(),
tools=[...]
)
This gives:✔ No double calls✔ Schema validation✔ Prevents loop✔ Reuse previous responses
🟦 B. LangChain ReAct Agent Example (Loop Safe)
from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool
task_cache = {}
def loan_eval_tool(args):
task_id = args["task_id"]
# Idempotency check
if task_id in task_cache:
return task_cache[task_id]
response = backend_loan_eval(args)
if validate_schema(response):
task_cache[task_id] = response
return response
else:
raise ValueError("Invalid schema")
loan_tool = Tool(
name="loan_evaluator",
func=loan_eval_tool,
description="Evaluates loan application."
)
agent = initialize_agent(
tools=[loan_tool],
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
max_iterations=4, # prevents infinite loops
handle_parsing_errors=True
)
🟦 C. Azure AI Foundry Agent Example
task_registry = {}
def call_tool(tool_name, payload):
task_id = payload["task_id"]
# Do not call again if completed
if task_id in task_registry:
return task_registry[task_id]
result = run_azure_tool(tool_name, payload)
if is_valid_response(result):
task_registry[task_id] = result
return result
raise Exception("Invalid tool response")
Azure must include your system guardrail block in the agent definition.
🚀 You Now Have All 3 Deliverables
1. 🔒 Loop Prevention Guardrail Block → copy into system prompt
2. 🟦 Tool Response Schemas → use for every tool
3. 🟩 Agent Code Examples → OpenAI, LangChain, Azure
These guarantee:
No infinite loops
No duplicate calls
No re-queued tasks
Safe tool orchestration
Stable and predictable agent behaviour
how to Confirm context is passed correctly (session id, step id, vector DB pointers). Check for context truncation (token limits) or accidental resets.
====
Below is the complete, production-grade approach to confirm context is passed correctly — including session_id, step_id, vector DB pointers, and how to detect + prevent context truncation or accidental resets.
This is exactly what high-maturity AI engineering teams (banking/enterprise/SaaS) implement.
✅ 1. How to Confirm Context Is Passed Correctly
Context includes:
session_id (conversation/session identity)
step_id (workflow or agent step identity)
vector DB pointers (retrieved context, chunk IDs, embedding versions, metadata)
state (task status, tool results, memory)
user intent + previous messages
You must validate each layer.
🟩 1A. Validate Session Context (session_id)
What to check:
Same session_id is available in every step
Agent/system does not regenerate a new ID
Backend consistently injects session during each request
Add guardrail to system prompt:
You MUST include the provided session_id in every tool call and every response.
If session_id is missing at any point, stop and request it again.
Backend validation:
def validate_session(request):
if "session_id" not in request:
raise ValueError("Missing session_id")
if request["session_id"] != expected_session:
raise ValueError("Session mismatch")
🟦 1B. Validate Step Context (step_id)
Each action must carry a step_id so the agent cannot mix steps.
Enforce:
step_id increment
step used only once
step not reused across tools
System rule:
Every tool call must include:
- session_id
- step_id
The agent must verify step_id order and never reuse older step_ids.
Backend check:
if step_id <= last_step_id:
raise Exception("Out-of-order step. Possible context reset.")
🟪 1C. Validate Vector DB Context (RAG pointers)
To confirm vector search results are correct:
Each retrieved chunk must include:
chunk_id
document_id
policy_version
embedding_version
source_pointer (S3/Blob/Git path)
score
metadata
Example returned record:
{
"chunk_id": "loan_policy_v2_chunk_34",
"doc_id": "loan_policy_v2",
"embedding_version": "v3.2",
"score": 0.91,
"source": "blob://policy/loan_policy_v2.pdf",
"text": "Loan > ₹15 lakh requires income proof..."
}
Agent guardrail:
Before answering, verify that:
1. All retrieved chunks include embedding_version and chunk_id.
2. All chunks come from a single, consistent policy_version.
If missing or inconsistent → requery the vector DB.
🟧 2. Detecting Context Truncation (Token Limits)
Symptoms:
Agent “forgets” session info
Missing earlier tool results
Vector pointers disappear
Step history lost
Unexpected contradictions
How to Detect:
A. Add a context checksum
Include in every message:
context_hash = hash(session_id + last_step_id + key_state_variables)
If model returns different hash → context dropped.
B. Track message token size
Before each LLM call:
if tokens(messages) > MAX_CONTEXT_LIMIT * 0.85:
trigger_compression()
If exceeded:
compress old history
store in external memory (Redis, DB)
C. Use “context heartbeat” markers
Add this to system prompt:
In every response, echo back:
- session_id
- step_id
- active_policy_version
This is mandatory and used to detect context loss.
If missing → context lost.
🟥 3. Detect Accidental Context Resets
Situations where LLM resets:
Temperature too high
Conversation exceeds token limit
Multi-step agent chain broke
Wrong system prompt injection
Function/tool calls override context
How to detect:
A. Insert sentinel markers
Example:
[SENTINEL_SESSION_START: {{session_id}}]
[SENTINEL_POLICY_VERSION: v2.1]
Check for presence in next model response.
If missing → context reset → force rebuild.
B. Last-known-state shadow copy
Store last state externally:
shadow_state = {
"session_id": ...,
"step_id": ...,
"policy_version": ...,
"embedding_version": ...,
"task_cache": ...
}
After every LLM response:
if llm_response.state != shadow_state:
print("Context mismatch detected")
C. Defensive prompting
Include this:
If any required context variable disappears (session_id, step_id, embedding_version, retrieved_chunks),
STOP EXECUTION and request the missing context.
Do not hallucinate missing variables.
🟩 4. Full Production Template to Add into Agent System Prompt
CONTEXT INTEGRITY RULES
1. ALWAYS carry forward:
- session_id
- step_id
- vector_db_chunk_ids
- policy_version
- embedding_version
2. If any required context variable is missing, ambiguous, or inconsistent,
STOP and request it again.
3. Echo back:
- session_id
- step_id
- policy_version
in every intermediate step.
This ensures detection of accidental context resets.
4. Verify vector DB results:
- Each record must include chunk_id, doc_id, embedding_version, and metadata.
- If any field is missing → requery the vector DB.
5. You must never proceed with missing state, partial context, or truncated context.
If truncation is suspected → request full context refresh.
🟩 5. How to Test Context Passing (Red-Team Tests)
Test 1 — Token Flood
Feed huge context and ensure agent still retains:
session_id
step_id
chunk pointers
Test 2 — Simulated Truncation
Remove mid-history and see if the agent requests context refresh.
Test 3 — Vector DB Pointer Swap
Switch embedding version and see if agent detects mismatch.
Test 4 — Session ID Tampering
Modify session ID mid-step → agent should stop.
✅ Summary
To confirm context is passed correctly, you must validate:
Context Item | Validation Method |
session_id | continuity + heartbeat |
step_id | monotonic increment + no reuse |
vector DB pointers | chunk_id, embedding_version consistency |
policy_version | alignment across chunks |
state | schema validation + shadow copy |
token truncation | heartbeat markers + context_hash |
context resets | sentinel + mandatory echo-back |
how to Use rolling summaries (condense older history into summary + store in memory). Store canonical state in a lightweight session DB and pass only needed pieces to model. Use retrieval filters (metadata + recency) and require min similarity threshold. Detect drift: log semantic similarity between current prompt and last-step context; if below threshold, rehydrate state.
=======
Below is the complete, production-grade implementation guide for all four capabilities you asked for:
Rolling summaries
Lightweight session DB for canonical state
Retrieval filters + minimum similarity threshold
Drift detection using semantic similarity + state rehydration
These patterns are used in ChatGPT-level agent frameworks, enterprise GenAI systems (banking/BFSI), multimodal agents, workflow orchestrators, and RAG platforms.
✅ 1. Rolling Summaries
Condense older conversation or tool-call history while keeping important context.
🎯 Goal
Keep LLM context small, but preserve meaning, decisions, state, and commitments.
🔹 A. Architecture Pattern
Full history (messages + tool results)
→ Summarizer LLM
→ rolling_summary
Use:
First 20–30 messages → compress
Anything older than N tokens → move to summary
Keep last 3–5 messages verbatim (never compress)
🔹 B. Rolling Summary Prompt Template
You are compressing conversation history.
Given the following previous summary and new messages:
- Previous summary: {{prior_summary}}
- New interactions: {{recent_history}}
Output:
1. Updated summary including decisions, commitments, variables.
2. A state delta (what changed).
3. A list of canonical facts with keys (session_id, step_id, policy_version, embedding_version).
Do NOT remove:
- session identifiers
- policies loaded
- tool results
- decisions made
🔹 C. When to Trigger Rolling Summary
Trigger when:
token usage > 70% of model’s context window
after each major tool call
at fixed intervals (e.g., every 10 steps)
🔹 D. Store Summaries Externally
Store in:
Redis
DynamoDB
MongoDB
Postgres
Azure Cosmos
Local KV store
Structure:
{
"session_id": "12345",
"rolling_summary": "...condensed text...",
"last_updated": "2025-11-24T18:20"
}
✅ 2. Store Canonical State in Lightweight Session DB
Only pass essential state to the model, not the entire context.
🔹 A. Canonical State Examples
State includes:
session_id
step_id
last completed action
policy version
embedding version
vector DB chunk_ids
key variables (loan amount, risk score, user objective)
tool results
🔹 B. Canonical State DB Schema
{
"session_id": "abcd1234",
"step_id": 42,
"canonical_state": {
"policy_version": "v3.1",
"embedding_version": "v1.5",
"active_chunks": ["loan_v3_c34", "loan_v3_c35"],
"loan_amount": 1200000,
"risk_score": 0.89
},
"updated_at": "2025-11-24T18:21:00"
}
🔹 C. Pattern for Passing State to LLM
Instead of full history → send:
rolling_summary
canonical_state
last 3 messages
user prompt
Example:
{
"rolling_summary": "...",
"canonical_state": {...},
"user_query": "...",
"recent_messages": [...]
}
This prevents context loss & token explosion.
✅ 3. Use Retrieval Filters + Minimum Similarity Threshold
Ensures ONLY relevant chunks are used, and irrelevant junk is filtered out.
🔹 A. Retrieval Configuration
Filters:
metadata filters
recency filters
version filters
VDB Query Example:
results = vector_db.search(
query_embedding,
top_k=12,
filters={
"policy_version": "v3.1",
"type": "lending_policy",
"is_active": True,
"tenant_id": "ICICI"
}
)
🔹 B. Minimum Similarity Threshold
If similarity < 0.75 → discard chunk.
filtered = [r for r in results if r.score >= 0.75]
If no chunks meet threshold → fallback:
"Requery using broad search" OR "Ask user for clarification"
🔹 C. Include metadata validation inside agent prompt:
You MUST only use vector DB results if:
- metadata.policy_version matches canonical_policy_version
- similarity_score >= 0.75
- chunk_id exists
- source_id is present
If not satisfied, re-query or request clarification.
This eliminates hallucinated or incorrect context.
✅ 4. Detect Drift (Semantic Drift Check)
Check if model’s current prompt is consistent with last-step context.If not → rehydrate.
🔹 A. How to Detect Drift
Compute semantic similarity between:
current prompt embedding
last_step_context embedding
If similarity < 0.65 → drift detected.
Code:
sim = cosine_similarity(embed(cur_prompt), embed(last_context))
if sim < 0.65:
drift = True
🔹 B. Drift Signals
Sudden topic change
Model forgetting step_id
Missing policy version
Missing chunk pointers
Contradicting previous decisions
Tool call arguments mismatched
🔹 C. Drift Handler (Rehydration Flow)
If drift = true:
Pull canonical_state from session DB
Pull rolling_summary
Rehydrate context:
system_prompt + rolling_summary + canonical_state
Reinsert last user query
Continue execution safely
🔹 D. Add Guardrail to System Prompt:
You MUST check semantic similarity between:
- your current response context
- last_step_state.context
If similarity < 0.65, you MUST request context rehydration
from canonical_state and rolling_summary before proceeding.
🎯 Putting It All Together (End-to-End Flow)
User Input → Retrieve canonical_state →
Retrieve rolling_summary →
Check drift →
If drift → Rehydrate context →
Perform RAG with filters + similarity threshold →
Update summary & canonical_state → Next step
🔥 Final Deliverable: Complete Playbook for Production
Your system now handles:
Conversation stability
State continuity
No context truncation
No accidental resets
Correct RAG behavior
Minimal hallucination
Drift detection
Guaranteed step-by-step consistency
If you want, I can also give you:
⚙️ A full microservice architecture for:
Rolling summaries
Semantic drift detection
Context rehydration
Canonical state DB
RAG filters
or
Context Integrity Microservice — Rolling Summary & Session Db
Context Integrity Microservice
Purpose: A production-ready microservice that ensures conversation context integrity for GenAI agents: rolling summaries, canonical session DB, retrieval filtering, semantic-drift detection, and state rehydration.
Targeted for banking/enterprise GenAI workflows.
1. Architecture Overview
Components:
API Gateway / Ingress — front door for client requests / agents.
Context Service (this microservice) — exposes REST/gRPC endpoints to manage session state, rolling summaries, and drift checks.
Session Store (Lightweight DB) — Redis (primary), fallback to Postgres for durability.
Rolling Summarizer Worker — uses LLM to compress older history (invoked synchronously or via queue).
Vector DB Interface — connector for Pinecone/Weaviate/pgvector; returns chunk metadata + embeddings.
Semantic Similarity Service — embedding provider + cosine calculator (can use external provider or local model service).
Task Cache / Idempotency Store — Redis keys for task_id -> result mapping.
Audit & Logs — ELK / Loki + structured logs.
Observability — Prometheus metrics, Grafana dashboards, distributed tracing (OpenTelemetry).
Sequence (high-level):
Agent sends POST /v1/session/{session_id}/query with user input + optional recent messages.
Context Service validates session, retrieves canonical state, rolling summary, and recent messages.
Run drift check (semantic sim). If drift, rehydrate and include canonical_state + rolling_summary.
Query Vector DB with metadata filters and min similarity threshold.
Call LLM with packed context (rolling_summary + canonical_state + filtered chunks + recent messages + user query).
Update canonical_state & rolling_summary asynchronously if needed.
2. API Contract (REST)
Authentication
JWT (short-lived) or mTLS for service-to-service calls
Endpoints
Create/Touch Session POST /v1/sessionPayload: { session_id, tenant_id, initial_state? }Returns: 201, session metadata
Get Session GET /v1/session/{session_id}Returns canonical state, rolling_summary pointer, last_step_id, last_updated
Query (main) POST /v1/session/{session_id}/queryPayload: { step_id, user_input, recent_messages[], metadata_filters?, top_k?, min_score? }Behavior:
Validate session + step ordering
Run drift check
Retrieve canonical_state + rolling_summary
Query vector DB with filters and min_score
Return: { session_id, step_id, context_used: { rolling_summary_id, chunk_ids }, filtered_chunks[], rehydrated: bool } plus llm_payload (optional)
Update Canonical State (internal) PUT /v1/session/{session_id}/statePayload: { step_id, canonical_state_delta }Behavior: Merge atomically; increment step_id if provided.
Get Rolling Summary GET /v1/session/{session_id}/summaryReturn current summary text and version
Trigger Summary Compression POST /v1/session/{session_id}/summary/compactPayload: { trigger_reason, force?: bool }Behavior: Enqueue summarizer worker; returns job id.
Health / Metrics GET /health and /metrics
3. Data Models
Session Document (Redis hash / Postgres row)
{
"session_id": "abcd-1234",
"tenant_id": "icici",
"last_step_id": 42,
"canonical_state": {
"policy_version": "v3.1",
"embedding_version": "v1.5",
"active_chunks": ["loan_v3_c34","loan_v3_c35"],
"key_facts": {"loan_amount":1200000}
},
"rolling_summary_id": "rs_abcd_20251124_01",
"rolling_summary_version": 3,
"task_cache_key": "session:abcd:tasks",
"created_at": "2025-11-24T18:21:00Z"
}
Rolling Summary Record (DB table / KV)
{
"summary_id": "rs_abcd_20251124_01",
"session_id": "abcd-1234",
"summary_text": "<condensed text>",
"last_messages_kept": 5,
"created_at": "2025-11-24T18:21:00Z"
}
Vector DB Pointer
{
"chunk_id": "loan_v3_c34",
"doc_id": "loan_policy_v3",
"embedding_version": "v3.2",
"source": "s3://policies/loan_policy_v3.pdf",
"score": 0.92,
"metadata": { "policy_version": "v3.1", "tenant_id": "icici" }
}
4. Core Logic (Implementation Patterns)
A. Request Validation & Session Guard
Verify session_id present
Verify step_id monotonic (step_id > last_step_id)
Validate JWT and tenant scoping
Acquire lightweight lock (Redis SETNX) for session write operations
B. Drift Check
Embed user_input + recent_messages -> vector
Compare with rolling_summary_embedding or last_context_embedding
If cosine_sim < DRIFT_THRESHOLD (e.g., 0.65) → set rehydrate=true
C. Vector DB Retrieval
Build query with metadata_filters (policy_version, tenant_id, doc_type)
Search top_k (configurable)
Apply min_score filter (e.g., 0.75)
Remove duplicates and ensure embedding_version matches canonical_state.embedding_version
D. Build LLM Payload
Compose: system_prompt + rolling_summary + canonical_state.key_facts + filtered_chunks + recent_messages + user_input
Keep token count in check; if > window*0.85 → compress rolling_summary or prune chunks
E. Post-LLM Handling
Save tool results to task_cache with task_id and TTL
If LLM indicates state delta, PUT /state to merge canonical_state
If summary needs update, enqueue summarizer worker
5. Sample Implementation (Node.js + TypeScript)
File: src/controllers/sessionController.ts (high-level pseudocode)
import express from 'express';
import Redis from 'ioredis';
import { embed, cosine } from './similarityService';
import { vectorSearch } from './vectorClient';
const router = express.Router();
const redis = new Redis(process.env.REDIS_URL);
router.post('/:sessionId/query', async (req, res) => {
const sessionId = req.params.sessionId;
const { step_id, user_input, recent_messages, metadata_filters, top_k=8, min_score=0.75 } = req.body;
// 1. Load session
const session = await loadSession(sessionId);
if (!session) return res.status(404).send({error:'session not found'});
// 2. Step validation
if (step_id <= session.last_step_id) return res.status(409).send({error:'step out-of-order'});
// 3. Drift check
const promptEmbed = await embed(user_input + recent_messages.join('\n'));
const lastContextEmbed = session.last_context_embedding;
const sim = cosine(promptEmbed, lastContextEmbed);
let rehydrated = false;
if (sim < parseFloat(process.env.DRIFT_THRESHOLD || '0.65')) {
// pull canonical_state + rolling summary
rehydrated = true;
}
// 4. Query vector DB
const results = await vectorSearch(user_input, {filters: metadata_filters, topK: top_k});
const filtered = results.filter(r => r.score >= min_score && r.metadata.embedding_version === session.canonical_state.embedding_version);
// 5. Prepare LLM payload (rolling_summary + session state + filtered chunks + last messages)
const payload = buildPayload(session, filtered, recent_messages, user_input);
6. Rolling Summarizer Worker (Flow)
Worker reads session_id from job queue
Loads full conversation history (from event store/DB)
Calls LLM summarizer with prior_summary + new_history
Writes new rolling_summary record and keeps a small verbatim tail (e.g., last 5 messages)
Updates session.rolling_summary_id atomically
7. Semantic Similarity Service
Prefer external embedding provider (OpenAI embeddings, Azure, or internal models)
Cache embeddings for recent items in Redis (ttl 24h)
Use batched embedding calls to reduce cost
8. Vector DB Interface Recommendations
Include metadata.policy_version and embedding_version for every chunk
Use dense vector indexes with HNSW
Store pointer to source (S3 / Blob) and chunk text truncated to 1-2 KB
9. Idempotency & Task Cache
For every tool request include task_id = sha256(session_id + step_id + tool_name + args)
Cache results in Redis: SETNX session:{session_id}:task:{task_id} with TTL
Agent should consult cache before re-executing
10. Observability & Alerts
Emit metrics:
sessions.active (gauge)
drift.detected (counter)
vector.filter.rate (ratio)
rolling_summary.jobs (histogram)
Logs should include structured fields: session_id, step_id, tenant_id, job_id
Alerts:
High drift rate (>5% requests)
Many out-of-order steps
Vector DB min_score misses
11. Security & Governance
Tenant scoping on every read
RBAC between services (service accounts)
Audit trail for canonical state writes
Redact PII in rolling_summary unless explicitly allowed
12. Deployment (Kubernetes)
Deploy as 3 components: api, worker, similarity-service
Use Horizontal Pod Autoscaler on CPU/Queue length
Use Redis managed (elasticache / azure cache) and Postgres for durable state
Use cert-manager for TLS in cluster
13. Integration Tests
Tests to cover: session creation, out-of-order step rejection, drift detection, vector filter behavior, rolling summary update, idempotent task calls
14. Example Config (env)
PORT=8080
REDIS_URL=redis://...:6379
POSTGRES_URL=postgres://...
VECTOR_DB_ENDPOINT=https://...
EMBEDDING_API_KEY=xxx
DRIFT_THRESHOLD=0.65
MIN_VECTOR_SCORE=0.75
ROLLING_SUMMARY_TRIGGER_STEPS=10
15. Next Steps / Extensions
Provide a ready-made SDK (Node/Python/Java) to integrate with agents
Add an admin UI to inspect session state and rehydrate manually
Add differential summarization (only include policy changes)
.png)

Comments