Response Recording and Resumption
This guide covers streaming responses with recording, resumption, and cancellation so users can reconnect after a disconnect and pick up where they left off. For a conceptual overview of how response recording and resumption works, including the gRPC service contract and multi-instance redirect behavior, see Response Recording and Resumption Concepts.
The route stays /response-resumption/ for link stability, but the feature name used in the Spring docs is “Response Recording and Resumption.”
Prerequisites
Starting checkpoint: This guide starts from java/spring/examples/doc-checkpoints/03-with-history
Make sure you’ve completed the previous guides:
- Getting Started - Basic memory service integration
- Conversation History - History recording and APIs
Streaming Responses
When users disconnect during a streaming response (page reload, network issues), you can resume the streaming response from where they left off. This requires that the agent uses streaming responses. Let’s update the agent to use streaming responses with Flux<String> and Server-Sent Events (SSE).
Update ChatController.java to use streaming:
package com.example.demo;
import io.github.chirino.memoryservice.history.ConversationHistoryStreamAdvisorBuilder;
import io.github.chirino.memoryservice.memory.MemoryServiceChatMemoryRepositoryBuilder;
import io.github.chirino.memoryservice.security.SecurityHelper;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.memory.MessageWindowChatMemory;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.Generation;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.http.MediaType;
import org.springframework.security.oauth2.client.OAuth2AuthorizedClientService;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@RestController
@RequestMapping("/chat")
class ChatController {
private static final Logger LOG = LoggerFactory.getLogger(ChatController.class);
private final ChatClient.Builder chatClientBuilder;
private final MemoryServiceChatMemoryRepositoryBuilder repositoryBuilder;
private final ConversationHistoryStreamAdvisorBuilder historyAdvisorBuilder;
private final OAuth2AuthorizedClientService authorizedClientService;
ChatController(
ChatClient.Builder chatClientBuilder,
MemoryServiceChatMemoryRepositoryBuilder repositoryBuilder,
ConversationHistoryStreamAdvisorBuilder historyAdvisorBuilder,
ObjectProvider<OAuth2AuthorizedClientService> authorizedClientServiceProvider) {
this.chatClientBuilder = chatClientBuilder;
this.repositoryBuilder = repositoryBuilder;
this.historyAdvisorBuilder = historyAdvisorBuilder;
this.authorizedClientService = authorizedClientServiceProvider.getIfAvailable();
}
@PostMapping(
path = "/{conversationId}",
consumes = MediaType.TEXT_PLAIN_VALUE,
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter chat(@PathVariable String conversationId, @RequestBody String userMessage) {
String bearerToken = SecurityHelper.bearerToken(authorizedClientService);
var chatMemoryAdvisor =
MessageChatMemoryAdvisor.builder(
MessageWindowChatMemory.builder()
.chatMemoryRepository(repositoryBuilder.build(bearerToken))
.build())
.build();
var historyAdvisor = historyAdvisorBuilder.build(bearerToken);
var chatClient =
chatClientBuilder
.clone()
.defaultSystem("You are a helpful assistant.")
.defaultAdvisors(historyAdvisor, chatMemoryAdvisor)
.defaultAdvisors(
advisor ->
advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
.build();
Flux<String> responseFlux =
chatClient.prompt().user(userMessage).stream()
.chatClientResponse()
.map(this::extractContent)
// Schedule subscription work off the request thread so the SSE response
// can be committed before an upstream failure is translated to HTTP 500.
.subscribeOn(Schedulers.boundedElastic());
SseEmitter emitter = new SseEmitter(0L);
Disposable subscription =
responseFlux.subscribe(
chunk -> safeSendChunk(emitter, new TokenFrame(chunk)),
failure -> safeCompleteWithError(emitter, failure),
() -> safeComplete(emitter));
emitter.onCompletion(subscription::dispose);
emitter.onTimeout(
() -> {
subscription.dispose();
safeComplete(emitter);
});
return emitter;
}
private void safeSendChunk(SseEmitter emitter, TokenFrame frame) {
try {
emitter.send(SseEmitter.event().data(frame));
} catch (IOException | IllegalStateException ignored) {
// Client disconnected or emitter already completed.
}
}
private void safeComplete(SseEmitter emitter) {
try {
emitter.complete();
} catch (IllegalStateException ignored) {
// Emitter already completed.
}
}
private void safeCompleteWithError(SseEmitter emitter, Throwable failure) {
LOG.warn("Streaming chat failed", failure);
try {
emitter.completeWithError(failure);
} catch (IllegalStateException ignored) {
// Emitter already completed.
}
}
private String extractContent(ChatClientResponse response) {
ChatResponse payload = response.chatResponse();
if (payload == null) {
return "";
}
StringBuilder builder = new StringBuilder();
for (Generation generation : payload.getResults()) {
Object output = generation.getOutput();
if (output instanceof AssistantMessage assistant) {
String text = assistant.getText();
if (StringUtils.hasText(text)) {
builder.append(text);
}
continue;
}
if (output != null) {
builder.append(output.toString());
}
}
return builder.toString();
}
public static final class TokenFrame {
private final String text;
public TokenFrame(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
} What changed: The return type changes from String to SseEmitter. The chat client’s .call().content() is replaced with .stream().content(), which returns a Flux<String>. Tokens from that flux are forwarded to the SseEmitter chunk by chunk, and the subscription is disposed when the emitter completes or times out.
Why: Server-Sent Events (SSE) allow the browser to receive tokens as they are generated rather than waiting for the full response. This makes the app feel significantly more responsive for longer answers. The SseEmitter with a 0L timeout means the connection stays open indefinitely until the stream finishes, the client disconnects, or a cancel request arrives.
Test it with curl:
curl -NsSfX POST http://localhost:9090/chat/8337ca02-62de-43e1-b69b-c663cb50acc4 \
-H "Content-Type: text/plain" \
-H "Authorization: Bearer $(get-token)" \
-d "Write a 4 paragraph story about a cat." Example output:
data:Once upon a time, there was a curious little cat named Whiskers who loved to explore the garden behind her cozy cottage. Every morning, she would stretch lazily in the sunlight before venturing out to discover what new wonders the day might bring.
data:
data:
data:One particularly bright afternoon, Whiskers stumbled upon a mysterious path she had never seen before. It wound through a thicket of wildflowers and disappeared into a shady grove of ancient oak trees.
data:
data:
data:At the end of the path, she discovered a beautiful hidden pond where dragonflies danced across the water and frogs sang their evening chorus. Whiskers sat quietly by the edge, mesmerized by the peaceful scene.
data:
data:
data:As the sun began to set, painting the sky in shades of orange and pink, Whiskers made her way back home. She curled up by the fireplace, purring contentedly, already dreaming of tomorrow's adventure at her secret pond. You should see the response streaming to your command line.
Now browse to the demo agent app at http://localhost:8080/?conversationId=8337ca02-62de-43e1-b69b-c663cb50acc4 and you should see the response streaming to the browser.
Response Recording and Resumption
package com.example.demo;
import io.github.chirino.memoryservice.client.MemoryServiceProxy;
import io.github.chirino.memoryservice.history.ResponseRecordingManager;
import io.github.chirino.memoryservice.security.SecurityHelper;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.oauth2.client.OAuth2AuthorizedClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.Disposable;
@RestController
@RequestMapping("/v1/conversations")
class ResumeController {
private static final Logger LOG = LoggerFactory.getLogger(ResumeController.class);
private final ResponseRecordingManager recordingManager;
private final MemoryServiceProxy proxy;
private final OAuth2AuthorizedClientService authorizedClientService;
ResumeController(
ResponseRecordingManager recordingManager,
MemoryServiceProxy proxy,
ObjectProvider<OAuth2AuthorizedClientService> authorizedClientServiceProvider) {
this.recordingManager = recordingManager;
this.proxy = proxy;
this.authorizedClientService = authorizedClientServiceProvider.getIfAvailable();
}
@PostMapping("/resume-check")
public List<String> check(@RequestBody List<String> conversationIds) {
return recordingManager.check(
conversationIds, SecurityHelper.bearerToken(authorizedClientService));
}
@GetMapping(path = "/{conversationId}/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter resume(@PathVariable String conversationId) {
String bearerToken = SecurityHelper.bearerToken(authorizedClientService);
SseEmitter emitter = new SseEmitter(0L);
Disposable subscription =
recordingManager
.replay(conversationId, bearerToken)
.subscribe(
chunk ->
safeSendChunk(
emitter, new ChatController.TokenFrame(chunk)),
failure -> safeCompleteWithError(emitter, failure),
() -> safeComplete(emitter));
emitter.onCompletion(subscription::dispose);
emitter.onTimeout(
() -> {
subscription.dispose();
safeComplete(emitter);
});
return emitter;
}
@PostMapping("/{conversationId}/cancel")
public ResponseEntity<?> cancelResponse(@PathVariable String conversationId) {
return proxy.cancelResponse(conversationId);
}
private void safeSendChunk(SseEmitter emitter, ChatController.TokenFrame frame) {
try {
emitter.send(SseEmitter.event().data(frame));
} catch (IOException | IllegalStateException ignored) {
// Client disconnected or emitter already completed
}
}
private void safeComplete(SseEmitter emitter) {
try {
emitter.complete();
} catch (IllegalStateException ignored) {
// Emitter already completed.
}
}
private void safeCompleteWithError(SseEmitter emitter, Throwable failure) {
LOG.warn("Replay stream failed", failure);
try {
emitter.completeWithError(failure);
} catch (IllegalStateException ignored) {
// Emitter already completed.
}
}
} What changed: A new ResumeController is introduced at /v1/conversations with three endpoints: POST /resume-check (batch check for in-progress responses), GET /{conversationId}/resume (SSE stream that replays the buffered response), and POST /{conversationId}/cancel (stops an in-progress response). It depends on the auto-configured ResponseRecordingManager and MemoryServiceProxy beans.
Why: When a user’s browser disconnects mid-stream (page reload, network drop), the LLM response continues generating on the server but is lost to the client. ResponseRecordingManager buffers the in-flight tokens in the Memory Service so a reconnecting client can call /resume and receive the complete response from the beginning, giving a seamless experience. The cancel endpoint lets users or the frontend interrupt a long-running generation without leaving the server in an orphaned state.
Test it with curl:
curl -NsSfX POST http://localhost:9090/chat/8337ca02-62de-43e1-b69b-c663cb50acc4 \
-H "Content-Type: text/plain" \
-H "Authorization: Bearer $(get-token)" \
-d "Write a 4 paragraph story about a cat."
And while the response is streaming, you can check use the following to check to see if the conversation has responses in progress:
curl -sSfX POST http://localhost:9090/v1/conversations/resume-check \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(get-token)" \
-d '["8337ca02-62de-43e1-b69b-c663cb50acc4"]' | jq
And to resume a conversation, you can run the following command in a new terminal:
curl -NsSfX GET http://localhost:9090/v1/conversations/8337ca02-62de-43e1-b69b-c663cb50acc4/resume \
-H "Authorization: Bearer $(get-token)"
You should see the response streaming to your command line.
Canceling a Response
To cancel a response, you can run the following command:
curl -sSfX POST http://localhost:9090/v1/conversations/8337ca02-62de-43e1-b69b-c663cb50acc4/cancel \
-H "Authorization: Bearer $(get-token)"
You should see the response get canceled.
Next Steps
- Conversation Sharing - Share conversations with other users.
- Docker Compose Integration - Automatic memory-service container startup for development and testing.