Advanced Features

This guide continues from Conversation History and covers advanced features like forking, streaming, and response resumption.

Prerequisites

Make sure you’ve completed the previous guides:

Conversation Forking

Conversation forking lets users branch off from any point in a conversation to explore alternative paths. Add these methods to the MemoryServiceProxyController to enable forking and listing forks:

@PostMapping("/{conversationId}/messages/{messageId}/fork")
public ResponseEntity<?> forkConversationAtMessage(
        @PathVariable String conversationId,
        @PathVariable String messageId,
        @RequestBody(required = false) String body) {
    return proxy.forkConversationAtMessage(conversationId, messageId, body);
}

@GetMapping("/{conversationId}/forks")
public ResponseEntity<?> listConversationForks(@PathVariable String conversationId) {
    return proxy.listConversationForks(conversationId);
}

The fork endpoint creates a new conversation that:

  • Copies all messages up to and excluding the specified message
  • Creates a new conversation ID for the fork
  • Links the fork back to the original conversation

Test it with curl:

# Get the id of the first message in the conversation
FIRST_MESSAGE_ID=$(curl -sSfX GET http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/messages \
  -H "Authorization: Bearer $(get-token)" | jq -r '.data[0].id')

curl -sSfX POST http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/messages/$FIRST_MESSAGE_ID/fork \
  -H "Authorization: Bearer $(get-token)" \
  -H "Content-Type: application/json" \
  -d '{"title": "Alternative approach"}' | jq

This will create a new conversation with the forkedAtConversationId set to the original conversation ID and the title “Alternative approach”, but with a new conversation ID.

Streaming Responses

When users disconnect during a streaming response (page reload, network issues), you can resume the streaming response from where they left off. This requires that the agent uses streaming responses. Let’s update the agent to use streaming responses with Flux<String> and Server-Sent Events (SSE).

Update ChatController.java to use streaming:

package com.example.demo;

import io.github.chirino.memoryservice.history.ConversationHistoryStreamAdvisorBuilder;
import io.github.chirino.memoryservice.memory.MemoryServiceChatMemoryRepositoryBuilder;
import io.github.chirino.memoryservice.security.SecurityHelper;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.client.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.memory.MessageWindowChatMemory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.security.oauth2.client.OAuth2AuthorizedClientService;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.io.IOException;

@RestController
@RequestMapping("/chat")
class ChatController {
    private final ChatClient.Builder chatClientBuilder;
    private final MemoryServiceChatMemoryRepositoryBuilder repositoryBuilder;
    private final ConversationHistoryStreamAdvisorBuilder historyAdvisorBuilder;
    private final OAuth2AuthorizedClientService authorizedClientService;

    ChatController(
            ChatClient.Builder chatClientBuilder,
            MemoryServiceChatMemoryRepositoryBuilder repositoryBuilder,
            ConversationHistoryStreamAdvisorBuilder historyAdvisorBuilder,
            ObjectProvider<OAuth2AuthorizedClientService> authorizedClientServiceProvider) {
        this.chatClientBuilder = chatClientBuilder;
        this.repositoryBuilder = repositoryBuilder;
        this.historyAdvisorBuilder = historyAdvisorBuilder;
        this.authorizedClientService = authorizedClientServiceProvider.getIfAvailable();
    }

    @PostMapping(
            path = "/{conversationId}",
            consumes = MediaType.TEXT_PLAIN_VALUE,
            produces = MediaType.TEXT_PLAIN_VALUE)
    public SseEmitter chat(
            @PathVariable String conversationId, @RequestBody String userMessage) {

        String bearerToken = SecurityHelper.bearerToken(authorizedClientService);
        var chatMemoryAdvisor = MessageChatMemoryAdvisor.builder(MessageWindowChatMemory.builder()
                .chatMemoryRepository(repositoryBuilder.build(bearerToken))
                .build()).build();
        var historyAdvisor = historyAdvisorBuilder.build(bearerToken);

        var chatClient = chatClientBuilder.clone()
                .defaultSystem("You are a helpful assistant.")
                .defaultAdvisors(historyAdvisor, chatMemoryAdvisor)
                .defaultAdvisors(advisor -> advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
                .build();

        Flux<String> responseFlux = chatClient.prompt().user(userMessage).stream().content();

        SseEmitter emitter = new SseEmitter(0L);
        Disposable subscription = responseFlux.subscribe(
                chunk -> safeSend(emitter, chunk),
                emitter::completeWithError,
                emitter::complete);

        emitter.onCompletion(subscription::dispose);
        emitter.onTimeout(() -> {
            subscription.dispose();
            emitter.complete();
        });
        return emitter;
    }

    private void safeSend(SseEmitter emitter, String chunk) {
        try {
            emitter.send(chunk);
        } catch (IOException | IllegalStateException ignored) {
        }
    }
}

Test it with curl:

curl -NsSfX POST http://localhost:9090/chat/3579aac5-c86e-4b67-bbea-6ec1a3644942 \
  -H "Content-Type: text/plain" \
  -H "Authorization: Bearer $(get-token)" \
  -d "Write a 4 paragraph story about a cat."

You should see the response streaming to your command line.

Now browse to the demo agent app at http://localhost:8080/?conversationId=3579aac5-c86e-4b67-bbea-6ec1a3644942 and you should see the response streaming to the browser.

Response Resumption

The following creates a REST endpoint that will allow us to:

  • Check if a conversation has a response that is in progress
  • Resume a response that is in progress
  • Cancel a response that is in progress
package com.example.demo;

import io.github.chirino.memoryservice.history.ResponseResumer;
import io.github.chirino.memoryservice.security.SecurityHelper;
import io.github.chirino.memoryservice.spring.MemoryServiceProxy;
import java.io.IOException;
import java.util.List;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.oauth2.client.OAuth2AuthorizedClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.Disposable;

@RestController
@RequestMapping("/v1/conversations")
class ResumeController {
    private final ResponseResumer responseResumer;
    private final MemoryServiceProxy proxy;
    private final OAuth2AuthorizedClientService authorizedClientService;

    ResumeController(
            ResponseResumer responseResumer,
            MemoryServiceProxy proxy,
            ObjectProvider<OAuth2AuthorizedClientService> authorizedClientServiceProvider) {
        this.responseResumer = responseResumer;
        this.proxy = proxy;
        this.authorizedClientService = authorizedClientServiceProvider.getIfAvailable();
    }

    @PostMapping("/resume-check")
    public List<String> check(@RequestBody List<String> conversationIds) {
        return responseResumer.check(
                conversationIds, SecurityHelper.bearerToken(authorizedClientService));
    }

    @GetMapping(path = "/{conversationId}/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter resume(@PathVariable String conversationId) {
        String bearerToken = SecurityHelper.bearerToken(authorizedClientService);
        SseEmitter emitter = new SseEmitter(0L);

        Disposable subscription = responseResumer.replay(conversationId, bearerToken)
                .subscribe(
                        chunk -> safeSend(emitter, chunk),
                        emitter::completeWithError,
                        emitter::complete);

        emitter.onCompletion(subscription::dispose);
        emitter.onTimeout(() -> { subscription.dispose(); emitter.complete(); });
        return emitter;
    }

    @PostMapping("/{conversationId}/cancel")
    public ResponseEntity<?> cancelResponse(@PathVariable String conversationId) {
        return proxy.cancelResponse(conversationId);
    }

    private void safeSend(SseEmitter emitter, String chunk) {
        try {
            emitter.send(chunk);
        } catch (IOException | IllegalStateException ignored) {
            // Client disconnected or emitter already completed
        }
    }
}

Test it with curl:

curl -NsSfX POST http://localhost:9090/chat/3579aac5-c86e-4b67-bbea-6ec1a3644942 \
  -H "Content-Type: text/plain" \
  -H "Authorization: Bearer $(get-token)" \
  -d "Write a 4 paragraph story about a cat."

And while the response is streaming, you can check use the following to check to see if the conversation has responses in progress:

curl -sSfX POST http://localhost:9090/v1/conversations/resume-check \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $(get-token)" \
  -d '["3579aac5-c86e-4b67-bbea-6ec1a3644942"]' | jq

And to resume a conversation, you can run the following command in a new terminal:

curl -NsSfX GET http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/resume \
  -H "Authorization: Bearer $(get-token)"

You should see the response streaming to your command line.

Canceling a Response

To cancel a response, you can run the following command:

curl -sSfX POST http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/cancel \
  -H "Authorization: Bearer $(get-token)"

You should see the response get canceled.

Complete Example

For a complete working example, see the spring/examples/chat-spring directory in the repository:

This example is the demo agent application that gets started by the Getting Started guide.

Next Steps