Advanced Features
This guide continues from Conversation History and covers advanced features like forking, streaming, and response resumption.
Prerequisites
Make sure you’ve completed the previous guides:
- Getting Started - Basic memory service integration
- Conversation History - History recording and APIs
- curl and jq installed
Conversation Forking
Conversation forking lets users branch off from any point in a conversation to explore alternative paths.
Add these methods to the ConversationsResource.java to enable forking and listing forks:
@POST
@Path("/{conversationId}/messages/{messageId}/fork")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response forkConversationAtMessage(
@PathParam("conversationId") String conversationId,
@PathParam("messageId") String messageId,
String body) {
return proxy.forkConversationAtMessage(conversationId, messageId, body);
}
@GET
@Path("/{conversationId}/forks")
@Produces(MediaType.APPLICATION_JSON)
public Response listConversationForks(@PathParam("conversationId") String conversationId) {
return proxy.listConversationForks(conversationId);
}
The fork endpoint creates a new conversation that:
- Copies all messages up to and exluding the specified message
- Creates a new conversation ID for the fork
- Links the fork back to the original conversation
Test it with curl:
# get the id of the first message in the conversation
FIRST_MESSAGE_ID=$(curl -sSfX GET http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/messages \
-H "Authorization: Bearer $(get-token)" | jq -r '.data[0].id')
curl -sSfX POST http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/messages/$FIRST_MESSAGE_ID/fork \
-H "Authorization: Bearer $(get-token)" \
-H "Content-Type: application/json" \
-d '{"title": "Alternative approach"}' | jq
This will create a new conversation with the forkedAtConversationId 3579aac5-c86e-4b67-bbea-6ec1a3644942 and the title “Alternative approach”, but you will will have a new conversation ID.
If you browse to the to http://localhost:8080/?conversationId=da597b28-5ccd-4900-92e9-0aec57394523 you will see that the first message now has fork. If you pick that fork you will see that the conversation history of the new fork is empty, because the selected message is not part of the fork, and we had selected the first message in the original conversation.
Streaming Responses
When users disconnect during a streaming response (page reload, network issues), you can resume the streaming response from where they left off. This requires that the agent uses streaming responses. Let’s update the agent to use streaming responses with Multi<String> return types instead of String.
Update the Agent.java to use streaming responses:
package org.acme;
import dev.langchain4j.service.MemoryId;
import io.quarkiverse.langchain4j.RegisterAiService;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
@RegisterAiService
public interface Agent {
Multi<String> chat(@MemoryId String conversationId, String userMessage);
}
Update the HistoryRecordingAgent.java to use streaming responses:
package org.acme;
import io.github.chirino.memory.history.annotations.ConversationId;
import io.github.chirino.memory.history.annotations.RecordConversation;
import io.github.chirino.memory.history.annotations.UserMessage;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class HistoryRecordingAgent {
private final Agent agent;
@Inject
public HistoryRecordingAgent(Agent agent) {
this.agent = agent;
}
@RecordConversation
public Multi<String> chat(@ConversationId String conversationId, @UserMessage String userMessage) {
return agent.chat(conversationId, userMessage);
}
}
Update ChatResource.java to use Server Sent Events (SSE) to stream the responses to the client:
package org.acme;
import io.smallrye.mutiny.Multi;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/chat")
public class ChatResource {
@Inject HistoryRecordingAgent agent;
@POST
@Path("/{conversationId}")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Multi<String> chat(
@PathParam("conversationId") String conversationId, String userMessage) {
return agent.chat(conversationId, userMessage);
}
}
Test it with curl:
curl -NsSfX POST http://localhost:9090/chat/3579aac5-c86e-4b67-bbea-6ec1a3644942 \
-H "Content-Type: text/plain" \
-H "Authorization: Bearer $(get-token)" \
-d "Write a 4 paragraph story about a cat."
You should see the response streaming to your command line.
Now browse to the demo agent app at http://localhost:8080/?conversationId=3579aac5-c86e-4b67-bbea-6ec1a3644942 and you should see the response streaming to the browser.
Response Resumption
The following creates a REST endpoint that will allow us to:
- Check if a conversation has a response that is in progress
- Resume a response that is in progress
- Cancel a response that is in progress
package org.acme;
import io.github.chirino.memory.history.runtime.ResponseResumer;
import io.github.chirino.memory.runtime.MemoryServiceProxy;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import java.util.List;
import static io.github.chirino.memory.security.SecurityHelper.bearerToken;
@Path("/v1/conversations")
@ApplicationScoped
public class ResumeResource {
@Inject ResponseResumer resumer;
@Inject SecurityIdentity securityIdentity;
@Inject MemoryServiceProxy proxy;
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("/resume-check")
public List<String> check(List<String> conversationIds) {
return resumer.check(conversationIds, bearerToken(securityIdentity));
}
@GET
@Path("/{conversationId}/resume")
@Blocking
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> resume(
@PathParam("conversationId") String conversationId) {
String bearerToken = bearerToken(securityIdentity);
return resumer.replay(conversationId, bearerToken);
}
@POST
@Path("/{conversationId}/cancel")
public Response cancelResponse(@PathParam("conversationId") String conversationId) {
return proxy.cancelResponse(conversationId);
}
}
Test it with curl:
curl -NsSfX POST http://localhost:9090/chat/3579aac5-c86e-4b67-bbea-6ec1a3644942 \
-H "Content-Type: text/plain" \
-H "Authorization: Bearer $(get-token)" \
-d "Write a 4 paragraph story about a cat."
And while the response is streaming, you can check use the following to check to see if the conversation has responses in progress:
curl -sSfX POST http://localhost:9090/v1/conversations/resume-check \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(get-token)" \
-d '["3579aac5-c86e-4b67-bbea-6ec1a3644942"]' | jq
And to resume a conversation, you can run the following command in a new terminal:
curl -NsSfX GET http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/resume \
-H "Authorization: Bearer $(get-token)"
You should see the response streaming to your command line.
Canceling a Response
To cancel a response, you can run the following command:
curl -sSfX POST http://localhost:9090/v1/conversations/3579aac5-c86e-4b67-bbea-6ec1a3644942/cancel \
-H "Authorization: Bearer $(get-token)"
You should see the response get canceled.
Complete Example
For a complete working example, see the examples/chat-quarkus directory in the repository:
This example is the demo agent application that gets started by the Getting Started guide.
Next Steps
- Dev Services - Automatic memory-service container startup for development and testing.