top of page

Interpolated ??

  • Writer: Anand Nerurkar
    Anand Nerurkar
  • Nov 25
  • 18 min read

Interpolated inputs in prompts refer to dynamic variables that are “injected” into a prompt at runtime instead of being hard-coded.They allow you to build prompt templates where parameters change based on context, user input, or system data.

What Are Interpolated Inputs?

It's like having placeholders inside your prompt that get filled when the model is executed.

Example (simple)

Template prompt:

You are analyzing a loan application. Applicant name: {{name}}, 
Credit score: {{credit_score}}, 
Loan amount: {{amount}}.
Provide risk assessment.

Runtime inputs get interpolated:

  • name = "Ravi"

  • credit_score = 715

  • amount = 12,00,000

Interpolated final prompt becomes:

You are analyzing a loan application. Applicant name: Ravi,
Credit score: 715,
Loan amount: 12,00,000.
Provide risk assessment.

✅ Why Use Interpolated Inputs?

1. Reuse same prompt for many scenarios

You create one template → fill values dynamically.

2. Prevent hard-coding

Useful in applications, pipelines, agents, workflows.

3. Works with LLM Orchestration tools

  • LangChain

  • LlamaIndex

  • Azure AI Foundry

  • AWS Bedrock Agents

  • OpenAI Assistants API

🔥 Examples of Interpolation

1. With OpenAI Prompt Template

prompt = """
Generate loan eligibility.

Name: {name}
Age: {age}
Income: {income}
"""

final_prompt = prompt.format(
    name="Anand", 
    age=35, 
    income=2400000
)

2. With LangChain

from langchain.prompts import PromptTemplate

template = """
Summarize customer complaint.

Customer ID: {cust_id}
Complaint: {complaint}
"""

prompt = PromptTemplate.from_template(template)

final_prompt = prompt.format(
    cust_id="C1021",
    complaint="ATM cash not dispensed but amount deducted."
)

3. With Azure AI Foundry Prompt Flow

YAML prompt:

input_variables:
  - name: policy
  - name: loan_amount

template: |
  You are a loan policy expert.
  Policy: {{policy}}
  Loan amount: {{loan_amount}}
  Give approval decision.

The {{policy}} and {{loan_amount}} get injected dynamically.

🔥 Real Banking Use Case Example (GenAI)

Prompt Template

You are an intelligent banking assistant.
KYC extracted fields:
Name: {{name}}
PAN: {{pan}}
Income: {{income}}
Loan Product: {{product}}
Lending Policy Version: {{policy_version}}

Use the above values and respond with:
- Decision
- Reason
- Missing Information

Dynamic Inputs from OCR + ML Pipeline:

{
  "name""Anand",
  "pan""ABCDE1234G",
  "income"2200000,
  "product""HomeLoan",
  "policy_version""v2.1"
}

Prompt becomes personalized for each customer.

🧠 Where Interpolated Inputs Are Critical?

LLM Agents

Agent gets facts → injects into system prompt.

RAG / Vector Search

Chunks retrieved → added as interpolated context:

Relevant policy: {{retrieved_context}}
User query: {{query}}

MLOps + LLMOps pipelines

Data from ML model → injected into LLM prompt.

Back Office Automation

Ticket ID, transaction data, logs → interpolated.

📌 Summary

Term

Meaning

Interpolated Inputs

Dynamic variables injected into prompts

Purpose

Re-use templates, keep prompts clean, adapt to context

Used in

RAG, Agents, Automation, MLOps, LLMOps

Syntax

{variable}, {{variable}}, $variable based on framework


How to Confirm tool responses are valid and not causing agent to requeue the same task.

=====

Below is a clear, production-grade method to confirm tool responses are valid and prevent an agent from re-queuing the same task.This applies to OpenAI Agents, LangChain Agents, Azure Agents, and any autonomous workflow with tool calls.

Problem

LLM agents sometimes:

  • Get invalid tool responses (empty, malformed, missing fields)

  • Re-queue the same tool request repeatedly

  • Fall into “looping behaviour”

Solution Overview

You must implement three layers of safety:

  1. Tool Response Schema Validation

  2. Idempotency + Task Completion Checks

  3. Agent Memory / Context Guardrails (Loop Prevention)

🟦 1. Tool Response Schema Validation (Hard Guarantee)

Every tool must return strict JSON with validated fields.

Example Schema

{
  "status""success | failure",
  "task_id""string",
  "data": {},
  "error_message""string or null"
}

Agent Instructions

Inside your system prompt / agent definition, enforce:

All tool results MUST include:
- status
- task_id
- data or error_message

If results do not follow schema, ask the tool to resend the response.
NEVER requeue the same task unless the tool returns a 'retry' flag.

Backend Check

Your tool implementation must:

  • Validate outgoing fields

  • Reject malformed responses before sending to LLM

🟦 2. Idempotency Keys + Task Completion Markers

To avoid infinite loops, the agent must NEVER re-run the same tool call unless explicitly required.

Implement:

A. Idempotency Key

Each task gets a consistent ID:

task_id = hash(user_input + context)

B. Task Status Table

task_id

status

last_updated

xyz123

completed

15:30

C. Agent Enforcement

Add instruction:

Before calling a tool, check if this task_id was already executed.
If yes, do not call the tool again. Instead use the existing result.

🟦 3. Loop Prevention Rules for the Agent

In your system prompt:

You must never call the same tool with the same arguments more than once.
If the tool returns an invalid result, do NOT retry automatically.
Ask for human clarification if the same failure happens twice.

Retry Strategy

Attempt

Action

1

Retry tool once with a clearer message

2

Ask human for clarification

3

Abort safely

🟩 Full Example (Production)

System Prompt for Agent

When using tools:
1. Each tool call must include a unique task_id.
2. Before calling a tool, check if the task_id has a known status of "completed".
   If completed, do NOT call the tool again.
3. Validate tool responses:
   - Must include: status, task_id, data, error_message
   - If the response is invalid, ask the tool for a corrected response ONCE.
4. Never attempt the same tool with identical arguments more than once.
5. If the tool fails twice, escalate to the user.

🟦 Tool Implementation Pattern

Tool returns:

{
  "status""success",
  "task_id""loan_eval_1021",
  "data": {
     "decision""approved",
     "score"91
  },
  "error_message"null
}

Agent validates before continuing:

  • schema correct?

  • task completed?

  • data is usable?

If not → agent requests corrected output, NOT tool re-execution.

🟥 The Biggest Mistakes (You Must Avoid)

❌ Agent retries same tool call with same args → infinite loop❌ Tools returning inconsistent JSON❌ No schema enforcement❌ No idempotency → multiple calls for same job❌ Agent tries to “fix” tool output by calling tool again


Below are all 3 items you requested — complete, production-ready, and tailored for GenAI + Banking + Automation + AI Engineering Teams (your context).

1. Loop Prevention Guardrail Block (System Prompt)

Copy–paste directly into your Agent system prompt.This prevents re-queues, duplicate tool calls, invalid responses, and agent loops.

🔒 Loop Prevention & Tool Validation Guardrail Block

TOOL USAGE RULES — STRICT POLICY

1. EVERY tool call MUST contain:
   - task_id (string, unique for the given input)
   - parameters (arguments passed to tool)

2. BEFORE calling a tool, you MUST check:
   - Has this exact task_id been executed already?
   - Has this tool already been invoked with identical arguments?

   If YES → DO NOT call the tool again.
   Instead, reuse the prior tool response.

3. TOOL RESPONSE VALIDATION:
   Every tool response must contain the following fields:
   - status: "success" | "failure"
   - task_id: string
   - data: JSON object (mandatory for success)
   - error_message: string or null (mandatory for failure)

4. If the tool response is malformed or missing fields:
   - Ask the tool ONCE for a corrected response.
   - Do NOT requeue the same tool task with the same arguments.

5. RETRY POLICY:
   - Only ONE retry allowed for malformed tool output.
   - If the tool fails twice, escalate to the human user.
   - Never loop or call the same tool again automatically.

6. IDEMPOTENCY:
   - Never call a tool more than once with the same (task_id + arguments).
   - Treat all tool operations as idempotent unless the tool explicitly returns:
       { "status": "failure", "retry": true }

7. SAFETY:
   - If conflict or ambiguity exists, STOP and ask for clarification.
   - Never hallucinate a tool response if a tool fails.

You MUST strictly follow these rules to prevent infinite loops, duplicate work, or unsafe tool execution.

2. Production Tool Response Schema Templates

These schemas must be returned by every tool your agent uses(ML tool, retrieval tool, vector DB tool, OCR tool, KYC tool, transaction tool, etc).

🟦 Standard Banking/GenAI Tool Response (Success)

{
  "status""success",
  "task_id""loan_eval_20251124_001",
  "data": {
    "decision""approved",
    "approval_score"0.92,
    "policy_version""v2.1"
  },
  "error_message"null
}

🟥 Standard Failure Schema (Non-retryable)

{
  "status""failure",
  "task_id""loan_eval_20251124_001",
  "data": {},
  "error_message""Missing income field in request"
}

🟧 Retryable Error Schema (Retry Allowed Once)

Use only when tool logic says retry is safe.

{
  "status""failure",
  "task_id""loan_eval_20251124_001",
  "data": {},
  "error_message""Temporary DB connection issue",
  "retry"true
}

🟩 Validation Rules to Enforce

You must reject responses if:

❌ “status” missing❌ “task_id” missing❌ “data” missing on success❌ “error_message” missing on failure❌ Unexpected fields❌ Output is not valid JSON

The agent must say:

“Tool response invalid — resend using required schema.”

3. Full Agent Code Examples (Idempotent + Loop-Safe)

Below are production-ready patterns for:

  • OpenAI Assistants API

  • LangChain Agents

  • Azure AI Foundry Agents

I’ll give all three.

🟦 A. OpenAI Assistants API Example (Production Template)

import hashlib
import json
from openai import OpenAI
client = OpenAI()

# ---- Idempotency store ----
completed_tasks = {}  # task_id -> response

def get_task_id(input_data):
    return hashlib.sha256(json.dumps(input_data, sort_keys=True).encode()).hexdigest()

def safe_tool_call(tool_name, arguments, task_id):
    # Check if already completed
    if task_id in completed_tasks:
        return completed_tasks[task_id]

    # Make the tool call
    response = call_tool_backend(tool_name, arguments)

    # Validate response schema
    if not validate_tool_schema(response):
        raise ValueError("Invalid tool response schema")

    # Save result before returning
    completed_tasks[task_id] = response
    return response

assistant = client.beta.assistants.create(
    name="BankingAutomationAgent",
    model="gpt-5.1",
    instructions=open("guardrail_block.txt").read(),
    tools=[...]
)

This gives:✔ No double calls✔ Schema validation✔ Prevents loop✔ Reuse previous responses

🟦 B. LangChain ReAct Agent Example (Loop Safe)

from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool

task_cache = {}

def loan_eval_tool(args):
    task_id = args["task_id"]

    # Idempotency check
    if task_id in task_cache:
        return task_cache[task_id]

    response = backend_loan_eval(args)

    if validate_schema(response):
        task_cache[task_id] = response
        return response
    else:
        raise ValueError("Invalid schema")

loan_tool = Tool(
    name="loan_evaluator",
    func=loan_eval_tool,
    description="Evaluates loan application."
)

agent = initialize_agent(
    tools=[loan_tool],
    agent=AgentType.OPENAI_FUNCTIONS,
    verbose=True,
    max_iterations=4,     # prevents infinite loops
    handle_parsing_errors=True
)

🟦 C. Azure AI Foundry Agent Example

task_registry = {}

def call_tool(tool_name, payload):
    task_id = payload["task_id"]

    # Do not call again if completed
    if task_id in task_registry:
        return task_registry[task_id]

    result = run_azure_tool(tool_name, payload)

    if is_valid_response(result):
        task_registry[task_id] = result
        return result

    raise Exception("Invalid tool response")

Azure must include your system guardrail block in the agent definition.

🚀 You Now Have All 3 Deliverables

1. 🔒 Loop Prevention Guardrail Block → copy into system prompt

2. 🟦 Tool Response Schemas → use for every tool

3. 🟩 Agent Code Examples → OpenAI, LangChain, Azure

These guarantee:

  • No infinite loops

  • No duplicate calls

  • No re-queued tasks

  • Safe tool orchestration

  • Stable and predictable agent behaviour


how to Confirm context is passed correctly (session id, step id, vector DB pointers). Check for context truncation (token limits) or accidental resets.

====

Below is the complete, production-grade approach to confirm context is passed correctly — including session_id, step_id, vector DB pointers, and how to detect + prevent context truncation or accidental resets.

This is exactly what high-maturity AI engineering teams (banking/enterprise/SaaS) implement.

1. How to Confirm Context Is Passed Correctly

Context includes:

  • session_id (conversation/session identity)

  • step_id (workflow or agent step identity)

  • vector DB pointers (retrieved context, chunk IDs, embedding versions, metadata)

  • state (task status, tool results, memory)

  • user intent + previous messages

You must validate each layer.

🟩 1A. Validate Session Context (session_id)

What to check:

  • Same session_id is available in every step

  • Agent/system does not regenerate a new ID

  • Backend consistently injects session during each request

Add guardrail to system prompt:

You MUST include the provided session_id in every tool call and every response.
If session_id is missing at any point, stop and request it again.

Backend validation:

def validate_session(request):
    if "session_id" not in request:
        raise ValueError("Missing session_id")
    if request["session_id"] != expected_session:
        raise ValueError("Session mismatch")

🟦 1B. Validate Step Context (step_id)

Each action must carry a step_id so the agent cannot mix steps.

Enforce:

  • step_id increment

  • step used only once

  • step not reused across tools

System rule:

Every tool call must include:
- session_id
- step_id

The agent must verify step_id order and never reuse older step_ids.

Backend check:

if step_id <= last_step_id:
    raise Exception("Out-of-order step. Possible context reset.")

🟪 1C. Validate Vector DB Context (RAG pointers)

To confirm vector search results are correct:

Each retrieved chunk must include:

  • chunk_id

  • document_id

  • policy_version

  • embedding_version

  • source_pointer (S3/Blob/Git path)

  • score

  • metadata

Example returned record:

{
  "chunk_id""loan_policy_v2_chunk_34",
  "doc_id""loan_policy_v2",
  "embedding_version""v3.2",
  "score"0.91,
  "source""blob://policy/loan_policy_v2.pdf",
  "text""Loan > ₹15 lakh requires income proof..."
}

Agent guardrail:

Before answeringverify that:
1. All retrieved chunks include embedding_version and chunk_id.
2. All chunks come from a singleconsistent policy_version.
If missing or inconsistent → requery the vector DB.

🟧 2. Detecting Context Truncation (Token Limits)

Symptoms:

  • Agent “forgets” session info

  • Missing earlier tool results

  • Vector pointers disappear

  • Step history lost

  • Unexpected contradictions

How to Detect:

A. Add a context checksum

Include in every message:

context_hash = hash(session_id + last_step_id + key_state_variables)

If model returns different hash → context dropped.

B. Track message token size

Before each LLM call:

if tokens(messages) > MAX_CONTEXT_LIMIT * 0.85:
    trigger_compression()

If exceeded:

  • compress old history

  • store in external memory (Redis, DB)

C. Use “context heartbeat” markers

Add this to system prompt:

In every response, echo back:
- session_id
- step_id
- active_policy_version
This is mandatory and used to detect context loss.

If missing → context lost.

🟥 3. Detect Accidental Context Resets

Situations where LLM resets:

  • Temperature too high

  • Conversation exceeds token limit

  • Multi-step agent chain broke

  • Wrong system prompt injection

  • Function/tool calls override context

How to detect:

A. Insert sentinel markers

Example:

[SENTINEL_SESSION_START: {{session_id}}]
[SENTINEL_POLICY_VERSION: v2.1]

Check for presence in next model response.

If missing → context reset → force rebuild.

B. Last-known-state shadow copy

Store last state externally:

shadow_state = {
  "session_id": ...,
  "step_id": ...,
  "policy_version": ...,
  "embedding_version": ...,
  "task_cache": ...
}

After every LLM response:

if llm_response.state != shadow_state:
    print("Context mismatch detected")

C. Defensive prompting

Include this:

If any required context variable disappears (session_id, step_id, embedding_version, retrieved_chunks),
STOP EXECUTION and request the missing context.
Do not hallucinate missing variables.

🟩 4. Full Production Template to Add into Agent System Prompt

CONTEXT INTEGRITY RULES

1. ALWAYS carry forward:
   - session_id
   - step_id
   - vector_db_chunk_ids
   - policy_version
   - embedding_version

2. If any required context variable is missing, ambiguous, or inconsistent,
STOP and request it again.

3. Echo back:
   - session_id
   - step_id
   - policy_version
   in every intermediate step. 
This ensures detection of accidental context resets.

4. Verify vector DB results:
   - Each record must include chunk_id, doc_id, embedding_version, and metadata.
   - If any field is missing → requery the vector DB.

5. You must never proceed with missing state, partial context, or truncated context.
If truncation is suspected → request full context refresh.

🟩 5. How to Test Context Passing (Red-Team Tests)

Test 1 — Token Flood

Feed huge context and ensure agent still retains:

  • session_id

  • step_id

  • chunk pointers

Test 2 — Simulated Truncation

Remove mid-history and see if the agent requests context refresh.

Test 3 — Vector DB Pointer Swap

Switch embedding version and see if agent detects mismatch.

Test 4 — Session ID Tampering

Modify session ID mid-step → agent should stop.

✅ Summary

To confirm context is passed correctly, you must validate:

Context Item

Validation Method

session_id

continuity + heartbeat

step_id

monotonic increment + no reuse

vector DB pointers

chunk_id, embedding_version consistency

policy_version

alignment across chunks

state

schema validation + shadow copy

token truncation

heartbeat markers + context_hash

context resets

sentinel + mandatory echo-back


how to Use rolling summaries (condense older history into summary + store in memory). Store canonical state in a lightweight session DB and pass only needed pieces to model. Use retrieval filters (metadata + recency) and require min similarity threshold. Detect drift: log semantic similarity between current prompt and last-step context; if below threshold, rehydrate state.

=======

Below is the complete, production-grade implementation guide for all four capabilities you asked for:

  1. Rolling summaries

  2. Lightweight session DB for canonical state

  3. Retrieval filters + minimum similarity threshold

  4. Drift detection using semantic similarity + state rehydration

These patterns are used in ChatGPT-level agent frameworks, enterprise GenAI systems (banking/BFSI), multimodal agents, workflow orchestrators, and RAG platforms.

✅ 1. Rolling Summaries

Condense older conversation or tool-call history while keeping important context.

🎯 Goal

Keep LLM context small, but preserve meaning, decisions, state, and commitments.

🔹 A. Architecture Pattern

Full history (messages + tool results) 
      → Summarizer LLM 
          → rolling_summary

Use:

  • First 20–30 messages → compress

  • Anything older than N tokens → move to summary

  • Keep last 3–5 messages verbatim (never compress)

🔹 B. Rolling Summary Prompt Template

You are compressing conversation history.

Given the following previous summary and new messages:
- Previous summary: {{prior_summary}}
- New interactions: {{recent_history}}

Output:
1. Updated summary including decisions, commitments, variables.
2. A state delta (what changed).
3. A list of canonical facts with keys (session_id, step_id, policy_version, embedding_version).

Do NOT remove:
- session identifiers
- policies loaded
- tool results
- decisions made

🔹 C. When to Trigger Rolling Summary

Trigger when:

  • token usage > 70% of model’s context window

  • after each major tool call

  • at fixed intervals (e.g., every 10 steps)

🔹 D. Store Summaries Externally

Store in:

  • Redis

  • DynamoDB

  • MongoDB

  • Postgres

  • Azure Cosmos

  • Local KV store

Structure:

{
  "session_id""12345",
  "rolling_summary""...condensed text...",
  "last_updated""2025-11-24T18:20"
}

✅ 2. Store Canonical State in Lightweight Session DB

Only pass essential state to the model, not the entire context.

🔹 A. Canonical State Examples

State includes:

  • session_id

  • step_id

  • last completed action

  • policy version

  • embedding version

  • vector DB chunk_ids

  • key variables (loan amount, risk score, user objective)

  • tool results

🔹 B. Canonical State DB Schema

{
  "session_id""abcd1234",
  "step_id"42,
  "canonical_state": {
    "policy_version""v3.1",
    "embedding_version""v1.5",
    "active_chunks": ["loan_v3_c34""loan_v3_c35"],
    "loan_amount"1200000,
    "risk_score"0.89
  },
  "updated_at""2025-11-24T18:21:00"
}

🔹 C. Pattern for Passing State to LLM

Instead of full history → send:

  • rolling_summary

  • canonical_state

  • last 3 messages

  • user prompt

Example:

{
  "rolling_summary""...",
  "canonical_state": {...},
  "user_query""...",
  "recent_messages": [...]
}

This prevents context loss & token explosion.

✅ 3. Use Retrieval Filters + Minimum Similarity Threshold

Ensures ONLY relevant chunks are used, and irrelevant junk is filtered out.

🔹 A. Retrieval Configuration

Filters:

  • metadata filters

  • recency filters

  • version filters

VDB Query Example:

results = vector_db.search(
    query_embedding,
    top_k=12,
    filters={
        "policy_version": "v3.1",
        "type": "lending_policy",
        "is_active": True,
        "tenant_id": "ICICI"
    }
)

🔹 B. Minimum Similarity Threshold

If similarity < 0.75 → discard chunk.

filtered = [r for r in results if r.score >= 0.75]

If no chunks meet threshold → fallback:

"Requery using broad search" OR "Ask user for clarification"

🔹 C. Include metadata validation inside agent prompt:

You MUST only use vector DB results if:
- metadata.policy_version matches canonical_policy_version
- similarity_score >= 0.75
- chunk_id exists
- source_id is present

If not satisfied, re-query or request clarification.

This eliminates hallucinated or incorrect context.

✅ 4. Detect Drift (Semantic Drift Check)

Check if model’s current prompt is consistent with last-step context.If not → rehydrate.

🔹 A. How to Detect Drift

Compute semantic similarity between:

  • current prompt embedding

  • last_step_context embedding

If similarity < 0.65 → drift detected.

Code:

sim = cosine_similarity(embed(cur_prompt), embed(last_context))

if sim < 0.65:
    drift = True

🔹 B. Drift Signals

  • Sudden topic change

  • Model forgetting step_id

  • Missing policy version

  • Missing chunk pointers

  • Contradicting previous decisions

  • Tool call arguments mismatched

🔹 C. Drift Handler (Rehydration Flow)

If drift = true:

  1. Pull canonical_state from session DB

  2. Pull rolling_summary

  3. Rehydrate context:

    system_prompt + rolling_summary + canonical_state

  4. Reinsert last user query

  5. Continue execution safely

🔹 D. Add Guardrail to System Prompt:

You MUST check semantic similarity between:
- your current response context
- last_step_state.context

If similarity < 0.65, you MUST request context rehydration
from canonical_state and rolling_summary before proceeding.

🎯 Putting It All Together (End-to-End Flow)

User Input → Retrieve canonical_state →
Retrieve rolling_summary →
Check drift →
If drift → Rehydrate context →
Perform RAG with filters + similarity threshold →
Update summary & canonical_state → Next step

🔥 Final Deliverable: Complete Playbook for Production

Your system now handles:

  • Conversation stability

  • State continuity

  • No context truncation

  • No accidental resets

  • Correct RAG behavior

  • Minimal hallucination

  • Drift detection

  • Guaranteed step-by-step consistency

If you want, I can also give you:

⚙️ A full microservice architecture for:

  • Rolling summaries

  • Semantic drift detection

  • Context rehydration

  • Canonical state DB

  • RAG filters

or


Context Integrity Microservice — Rolling Summary & Session Db

Context Integrity Microservice

Purpose: A production-ready microservice that ensures conversation context integrity for GenAI agents: rolling summaries, canonical session DB, retrieval filtering, semantic-drift detection, and state rehydration.

Targeted for banking/enterprise GenAI workflows.

1. Architecture Overview

Components:

  • API Gateway / Ingress — front door for client requests / agents.

  • Context Service (this microservice) — exposes REST/gRPC endpoints to manage session state, rolling summaries, and drift checks.

  • Session Store (Lightweight DB) — Redis (primary), fallback to Postgres for durability.

  • Rolling Summarizer Worker — uses LLM to compress older history (invoked synchronously or via queue).

  • Vector DB Interface — connector for Pinecone/Weaviate/pgvector; returns chunk metadata + embeddings.

  • Semantic Similarity Service — embedding provider + cosine calculator (can use external provider or local model service).

  • Task Cache / Idempotency Store — Redis keys for task_id -> result mapping.

  • Audit & Logs — ELK / Loki + structured logs.

  • Observability — Prometheus metrics, Grafana dashboards, distributed tracing (OpenTelemetry).

Sequence (high-level):

  1. Agent sends POST /v1/session/{session_id}/query with user input + optional recent messages.

  2. Context Service validates session, retrieves canonical state, rolling summary, and recent messages.

  3. Run drift check (semantic sim). If drift, rehydrate and include canonical_state + rolling_summary.

  4. Query Vector DB with metadata filters and min similarity threshold.

  5. Call LLM with packed context (rolling_summary + canonical_state + filtered chunks + recent messages + user query).

  6. Update canonical_state & rolling_summary asynchronously if needed.

2. API Contract (REST)

Authentication

  • JWT (short-lived) or mTLS for service-to-service calls

Endpoints

Create/Touch Session POST /v1/sessionPayload: { session_id, tenant_id, initial_state? }Returns: 201, session metadata

Get Session GET /v1/session/{session_id}Returns canonical state, rolling_summary pointer, last_step_id, last_updated

Query (main) POST /v1/session/{session_id}/queryPayload: { step_id, user_input, recent_messages[], metadata_filters?, top_k?, min_score? }Behavior:

  • Validate session + step ordering

  • Run drift check

  • Retrieve canonical_state + rolling_summary

  • Query vector DB with filters and min_score

  • Return: { session_id, step_id, context_used: { rolling_summary_id, chunk_ids }, filtered_chunks[], rehydrated: bool } plus llm_payload (optional)

Update Canonical State (internal) PUT /v1/session/{session_id}/statePayload: { step_id, canonical_state_delta }Behavior: Merge atomically; increment step_id if provided.

Get Rolling Summary GET /v1/session/{session_id}/summaryReturn current summary text and version

Trigger Summary Compression POST /v1/session/{session_id}/summary/compactPayload: { trigger_reason, force?: bool }Behavior: Enqueue summarizer worker; returns job id.

Health / Metrics GET /health and /metrics

3. Data Models

Session Document (Redis hash / Postgres row)

{

  "session_id": "abcd-1234",

  "tenant_id": "icici",

  "last_step_id": 42,

  "canonical_state": {

    "policy_version": "v3.1",

    "embedding_version": "v1.5",

    "active_chunks": ["loan_v3_c34","loan_v3_c35"],

    "key_facts": {"loan_amount":1200000}

  },

  "rolling_summary_id": "rs_abcd_20251124_01",

  "rolling_summary_version": 3,

  "task_cache_key": "session:abcd:tasks",

  "created_at": "2025-11-24T18:21:00Z"

}

Rolling Summary Record (DB table / KV)

{

  "summary_id": "rs_abcd_20251124_01",

  "session_id": "abcd-1234",

  "summary_text": "<condensed text>",

  "last_messages_kept": 5,

  "created_at": "2025-11-24T18:21:00Z"

}

Vector DB Pointer

{

  "chunk_id": "loan_v3_c34",

  "doc_id": "loan_policy_v3",

  "embedding_version": "v3.2",

  "source": "s3://policies/loan_policy_v3.pdf",

  "score": 0.92,

  "metadata": { "policy_version": "v3.1", "tenant_id": "icici" }

}

4. Core Logic (Implementation Patterns)

A. Request Validation & Session Guard

  • Verify session_id present

  • Verify step_id monotonic (step_id > last_step_id)

  • Validate JWT and tenant scoping

  • Acquire lightweight lock (Redis SETNX) for session write operations

B. Drift Check

  • Embed user_input + recent_messages -> vector

  • Compare with rolling_summary_embedding or last_context_embedding

  • If cosine_sim < DRIFT_THRESHOLD (e.g., 0.65) → set rehydrate=true

C. Vector DB Retrieval

  • Build query with metadata_filters (policy_version, tenant_id, doc_type)

  • Search top_k (configurable)

  • Apply min_score filter (e.g., 0.75)

  • Remove duplicates and ensure embedding_version matches canonical_state.embedding_version

D. Build LLM Payload

  • Compose: system_prompt + rolling_summary + canonical_state.key_facts + filtered_chunks + recent_messages + user_input

  • Keep token count in check; if > window*0.85 → compress rolling_summary or prune chunks

E. Post-LLM Handling

  • Save tool results to task_cache with task_id and TTL

  • If LLM indicates state delta, PUT /state to merge canonical_state

  • If summary needs update, enqueue summarizer worker

5. Sample Implementation (Node.js + TypeScript)

File: src/controllers/sessionController.ts (high-level pseudocode)

import express from 'express';

import Redis from 'ioredis';

import { embed, cosine } from './similarityService';

import { vectorSearch } from './vectorClient';

const router = express.Router();

const redis = new Redis(process.env.REDIS_URL);

router.post('/:sessionId/query', async (req, res) => {

  const sessionId = req.params.sessionId;

  const { step_id, user_input, recent_messages, metadata_filters, top_k=8, min_score=0.75 } = req.body;

  // 1. Load session

  const session = await loadSession(sessionId);

  if (!session) return res.status(404).send({error:'session not found'});

  // 2. Step validation

  if (step_id <= session.last_step_id) return res.status(409).send({error:'step out-of-order'});

  // 3. Drift check

  const promptEmbed = await embed(user_input + recent_messages.join('\n'));

  const lastContextEmbed = session.last_context_embedding;

  const sim = cosine(promptEmbed, lastContextEmbed);

  let rehydrated = false;

  if (sim < parseFloat(process.env.DRIFT_THRESHOLD || '0.65')) {

    // pull canonical_state + rolling summary

    rehydrated = true;

  }

  // 4. Query vector DB

  const results = await vectorSearch(user_input, {filters: metadata_filters, topK: top_k});

  const filtered = results.filter(r => r.score >= min_score && r.metadata.embedding_version === session.canonical_state.embedding_version);

  // 5. Prepare LLM payload (rolling_summary + session state + filtered chunks + last messages)

  const payload = buildPayload(session, filtered, recent_messages, user_input);

6. Rolling Summarizer Worker (Flow)

  • Worker reads session_id from job queue

  • Loads full conversation history (from event store/DB)

  • Calls LLM summarizer with prior_summary + new_history

  • Writes new rolling_summary record and keeps a small verbatim tail (e.g., last 5 messages)

  • Updates session.rolling_summary_id atomically

7. Semantic Similarity Service

  • Prefer external embedding provider (OpenAI embeddings, Azure, or internal models)

  • Cache embeddings for recent items in Redis (ttl 24h)

  • Use batched embedding calls to reduce cost

8. Vector DB Interface Recommendations

  • Include metadata.policy_version and embedding_version for every chunk

  • Use dense vector indexes with HNSW

  • Store pointer to source (S3 / Blob) and chunk text truncated to 1-2 KB

9. Idempotency & Task Cache

  • For every tool request include task_id = sha256(session_id + step_id + tool_name + args)

  • Cache results in Redis: SETNX session:{session_id}:task:{task_id} with TTL

  • Agent should consult cache before re-executing

10. Observability & Alerts

  • Emit metrics:

    • sessions.active (gauge)

    • drift.detected (counter)

    • vector.filter.rate (ratio)

    • rolling_summary.jobs (histogram)

  • Logs should include structured fields: session_id, step_id, tenant_id, job_id

  • Alerts:

    • High drift rate (>5% requests)

    • Many out-of-order steps

    • Vector DB min_score misses

11. Security & Governance

  • Tenant scoping on every read

  • RBAC between services (service accounts)

  • Audit trail for canonical state writes

  • Redact PII in rolling_summary unless explicitly allowed

12. Deployment (Kubernetes)

  • Deploy as 3 components: api, worker, similarity-service

  • Use Horizontal Pod Autoscaler on CPU/Queue length

  • Use Redis managed (elasticache / azure cache) and Postgres for durable state

  • Use cert-manager for TLS in cluster

13. Integration Tests

  • Tests to cover: session creation, out-of-order step rejection, drift detection, vector filter behavior, rolling summary update, idempotent task calls

14. Example Config (env)

PORT=8080

REDIS_URL=redis://...:6379

POSTGRES_URL=postgres://...

VECTOR_DB_ENDPOINT=https://...

EMBEDDING_API_KEY=xxx

DRIFT_THRESHOLD=0.65

MIN_VECTOR_SCORE=0.75

ROLLING_SUMMARY_TRIGGER_STEPS=10

15. Next Steps / Extensions

  • Provide a ready-made SDK (Node/Python/Java) to integrate with agents

  • Add an admin UI to inspect session state and rehydrate manually

  • Add differential summarization (only include policy changes)


 
 
 

Recent Posts

See All
How to replan- No outcome after 6 month

⭐ “A transformation program is running for 6 months. Business says it is not delivering the value they expected. What will you do?” “When business says a 6-month transformation isn’t delivering value,

 
 
 
EA Strategy in case of Merger

⭐ EA Strategy in Case of a Merger (M&A) My EA strategy for a merger focuses on four pillars: discover, decide, integrate, and optimize.The goal is business continuity + synergy + tech consolidation. ✅

 
 
 

Comments

Rated 0 out of 5 stars.
No ratings yet

Add a rating
  • Facebook
  • Twitter
  • LinkedIn

©2024 by AeeroTech. Proudly created with Wix.com

bottom of page