Python Response Recording and Resumption
This guide covers streaming responses with recording, resumption, and cancellation so users can reconnect after a disconnect and continue where they left off.
The route stays /response-resumption/ for link stability, but the feature name used in the Python docs is “Response Recording and Resumption.”
For the protocol and behavior model, see Response Recording and Resumption Concepts.
Prerequisites
Starting checkpoint: This guide starts from python/examples/langchain/doc-checkpoints/03-with-history
Make sure you’ve completed:
- Python Getting Started - Minimal agent + memory checkpointer
- Python Conversation History - History recording and conversation APIs
Also complete Step 2 in Python Dev Setup (build local memory-service-langchain wheel + UV_FIND_LINKS); this is temporary until the package is released.
Streaming Responses
Checkpoint 05 implements the response recording/resumption pattern directly in the tutorial flow:
POST /chat/{conversation_id}streams SSE frames from live LangChain tokens.MemoryServiceResponseRecordingManager.from_env()records that stream via Memory Service gRPC.GET /v1/conversations/{conversation_id}/resumereplays the same in-progress stream over SSE.POST /v1/conversations/{conversation_id}/cancelcancels both the local stream and the Memory Service recording.
New Imports in Checkpoint 05
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from memory_service_langchain import (
MemoryServiceCheckpointSaver,
MemoryServiceHistoryMiddleware,
MemoryServiceProxy,
MemoryServiceResponseRecordingManager,
install_fastapi_authorization_middleware,
memory_service_scope,
to_fastapi_response, What changed: The checkpoint imports StreamingResponse and MemoryServiceResponseRecordingManager, and adds small SSE/token helpers (to_sse_chunk, extract_text_chunks).
Why: LangChain token chunks vary by provider/model shape, so extraction and SSE formatting are explicit in the checkpoint. MemoryServiceResponseRecordingManager is the bridge that makes replay/cancel work with the Memory Service response recorder APIs.
Enable gRPC-backed resumption
app = FastAPI(title="Python LangChain Agent With Response Recording and Resumption")
@app.get("/ready") What changed: The recording_manager is created with MemoryServiceResponseRecordingManager.from_env().
Why: MemoryServiceResponseRecordingManager wraps the live response stream, records streamed chunks for the conversation, and provides the shared resume-check, resume, and cancel behavior used by the API endpoints.
What Changes in chat(...)
install_fastapi_authorization_middleware(app)
proxy = MemoryServiceProxy.from_env()
recording_manager = MemoryServiceResponseRecordingManager.from_env()
@app.post("/chat/{conversation_id}")
async def chat(conversation_id: str, request: Request) -> StreamingResponse:
user_message = (await request.body()).decode("utf-8").strip()
if not user_message:
raise HTTPException(400, "message is required")
await proxy.ensure_conversation(
conversation_id,
f"Python checkpoint {conversation_id}",
)
async def source():
with memory_service_scope(conversation_id): What changed: The endpoint reads plain text request body, uses agent.astream(..., stream_mode="messages"), emits PartialResponse SSE events, and wraps the generator with recording_manager.stream_from_source(...).
Why: This keeps the tutorial path minimal while still demonstrating live streaming + resume/cancel integration.
Make sure you define a shell function that can get the bearer token for the bob user:
function get-token() {
curl -sSfX POST http://localhost:8081/realms/memory-service/protocol/openid-connect/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "client_id=memory-service-client" \
-d "client_secret=change-me" \
-d "grant_type=password" \
-d "username=bob" \
-d "password=bob" \
| jq -r '.access_token'
}Start a streaming response:
curl -NsSfX POST http://localhost:9090/chat/2946ccfc-cf26-43e1-87fc-f71c944895b1 \
-H "Content-Type: text/plain" \
-H "Authorization: Bearer $(get-token)" \
-d "Write a short story about a cat." Example output:
data: {"text":"Once "}
data: {"text":"upon "}
data: {"text":"a time..."} Response Recording and Resumption APIs
Checkpoint 05 also exposes three endpoints backed by MemoryServiceResponseRecordingManager:
)
return to_fastapi_response(response)
@app.get("/v1/conversations/{conversation_id}/forks")
async def list_forks(conversation_id: str):
response = await proxy.list_conversation_forks(conversation_id)
return to_fastapi_response(response)
@app.post("/v1/conversations/resume-check")
async def resume_check(conversation_ids: list[str]) -> JSONResponse:
return JSONResponse(recording_manager.check(conversation_ids), status_code=200)
@app.get("/v1/conversations/{conversation_id}/resume")
async def resume_response(conversation_id: str):
try:
stream = recording_manager.replay_sse(conversation_id, stream_mode="events")
except ValueError as exc:
raise HTTPException(400, "invalid conversation id") from exc What changed: resume-check queries active recordings, resume uses recording_manager.replay_sse(...), and cancel calls both recording_manager.cancel(...) and proxied Memory Service cancel_response(...).
Why: When a user disconnects mid-stream (page reload, network drop), the LLM is still generating tokens on the server. MemoryServiceResponseRecordingManager lets the client reconnect and receive the remainder of the response without resubmitting the message, and the cancel endpoint lets users abort a generation that is taking too long.
Check whether a conversation has an in-progress response:
curl -sSfX POST http://localhost:9090/v1/conversations/resume-check \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(get-token)" \
-d '["2946ccfc-cf26-43e1-87fc-f71c944895b1"]' Example output:
[
"2946ccfc-cf26-43e1-87fc-f71c944895b1"
] Resume manually from another terminal:
curl -NsSfX GET http://localhost:9090/v1/conversations/2946ccfc-cf26-43e1-87fc-f71c944895b1/resume \
-H "Authorization: Bearer $(get-token)"
Cancel manually:
curl -sSfX POST http://localhost:9090/v1/conversations/2946ccfc-cf26-43e1-87fc-f71c944895b1/cancel \
-H "Authorization: Bearer $(get-token)"
Completed Checkpoint
Completed code: View the full implementation at python/examples/langchain/doc-checkpoints/05-response-resumption
Next Steps
Continue to:
- Sharing - Membership and ownership transfer APIs
- Indexing and Search - Indexed content and semantic/full-text search