LangGraph Response Recording and Resumption

This guide covers streaming responses with recording, resumption, and cancellation so users can reconnect after a disconnect and continue where they left off.

The route stays /response-resumption/ for link stability, but the feature name used in the LangGraph docs is “Response Recording and Resumption.”

For the protocol and behavior model, see Response Recording and Resumption Concepts.

Prerequisites

Starting checkpoint: This guide starts from python/examples/langgraph/doc-checkpoints/03-with-history

Make sure you’ve completed:

Also complete Step 2 in LangGraph Dev Setup (build local memory-service-langchain wheel + UV_FIND_LINKS); this is temporary until the package is released.

Streaming Responses

Checkpoint 05 updates /chat/{conversation_id} to stream the response back to the client using StreamingResponse with MemoryServiceResponseRecordingManager.

New Imports in Checkpoint 05

app.py
from typing import Any

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph
from langgraph.graph.message import MessagesState
from memory_service_langchain import (
    MemoryServiceCheckpointSaver,
    MemoryServiceHistoryMiddleware,
    MemoryServiceProxy,
    MemoryServiceResponseRecordingManager,
    install_fastapi_authorization_middleware,
    memory_service_scope,
    to_fastapi_response,

What changed: JSONResponse and StreamingResponse are added from fastapi.responses, and MemoryServiceResponseRecordingManager and extract_assistant_text are imported from memory_service_langchain.

Why: StreamingResponse is needed to return text incrementally rather than buffering the full reply. MemoryServiceResponseRecordingManager tracks the in-progress stream so it can be replayed or canceled from a separate request. extract_assistant_text pulls the final assistant text out of the completed graph.ainvoke() result dict.

What Changes in chat(...)

Checkpoint 05 wraps the chat handler’s response in a StreamingResponse backed by MemoryServiceResponseRecordingManager:

app.py

    if isinstance(content, list):
        for item in content:
            if isinstance(item, str) and item:
                tokens.append(item)
                continue
            if not isinstance(item, dict):
                continue
            for key in ("text", "content", "value"):
                value = item.get(key)
                if isinstance(value, str) and value:
                    tokens.append(value)
                    break
            delta = item.get("delta")
            if isinstance(delta, dict):
                for key in ("text", "content", "value"):
                    value = delta.get(key)
                    if isinstance(value, str) and value:
                        tokens.append(value)
                        break
    return tokens

What changed: The graph is invoked with graph.ainvoke() inside a memory_service_scope block (same as checkpoints 03/04). After the graph completes, extract_assistant_text(result) pulls out the assistant’s reply text. recording_manager.stream(conversation_id, ai_text) registers the text as a resumable stream and yields its tokens, and the endpoint returns a StreamingResponse over that stream.

Why: FastAPI middleware resets authentication context variables in its finally block before the StreamingResponse body is iterated by the server. If the graph ran inside the streaming generator instead, those variables would be None when the checkpoint saver and history middleware make calls to the memory service. Running graph.ainvoke() synchronously within the request context — before the middleware’s finally block runs — ensures authentication is available throughout the graph execution. recording_manager.stream() then provides the streaming and resumption capability without requiring auth at stream time.

Make sure you define a shell function that can get the bearer token for the bob user:

function get-token() {
  curl -sSfX POST http://localhost:8081/realms/memory-service/protocol/openid-connect/token \
    -H "Content-Type: application/x-www-form-urlencoded" \
    -d "client_id=memory-service-client" \
    -d "client_secret=change-me" \
    -d "grant_type=password" \
    -d "username=bob" \
    -d "password=bob" \
    | jq -r '.access_token'
}

Start a streaming response:

curl -NsSfX POST http://localhost:9090/chat/12b08143-3edb-4a86-a34e-e91aacc7fc02 \
  -H "Content-Type: text/plain" \
  -H "Authorization: Bearer $(get-token)" \
  -d "Write a short story about a cat."

Example output:

matching response

Response Recording and Resumption APIs

Checkpoint 05 also exposes three endpoints backed by MemoryServiceResponseRecordingManager:

app.py
    response = await model.ainvoke(messages)
    return {"messages": [response]}


builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)

app = FastAPI(title="LangGraph Chatbot with Response Recording and Resumption")


@app.get("/ready")
async def ready() -> dict[str, str]:
    return {"status": "ok"}
install_fastapi_authorization_middleware(app)
proxy = MemoryServiceProxy.from_env()
recording_manager = MemoryServiceResponseRecordingManager.from_env()


@app.post("/chat/{conversation_id}")
async def chat(conversation_id: str, request: Request) -> StreamingResponse:
    user_message = (await request.body()).decode("utf-8").strip()

What changed: Three new endpoints are added: POST /v1/conversations/resume-check, GET /v1/conversations/{conversation_id}/resume, and POST /v1/conversations/{conversation_id}/cancel, all backed by the recording_manager instance.

Why: resume-check accepts a list of conversation IDs and returns which ones have an active in-progress stream — useful for a frontend polling on load to decide whether to show a “Reconnect” button. resume replays the buffered tokens from a prior stream so a disconnected client can catch up without the agent re-invoking the model. cancel signals the generator to stop and clears the buffer, freeing resources when the user navigates away.

Check whether a conversation has an in-progress response:

curl -sSfX POST http://localhost:9090/v1/conversations/resume-check \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $(get-token)" \
  -d '["12b08143-3edb-4a86-a34e-e91aacc7fc02"]'

Example output:

[
  "12b08143-3edb-4a86-a34e-e91aacc7fc02"
]

Resume manually from another terminal:

curl -NsSfX GET http://localhost:9090/v1/conversations/12b08143-3edb-4a86-a34e-e91aacc7fc02/resume \
  -H "Authorization: Bearer $(get-token)"

Cancel manually:

curl -sSfX POST http://localhost:9090/v1/conversations/12b08143-3edb-4a86-a34e-e91aacc7fc02/cancel \
  -H "Authorization: Bearer $(get-token)"

Completed Checkpoint

Completed code: View the full implementation at python/examples/langgraph/doc-checkpoints/05-response-resumption

Next Steps

Continue to: