LangChain Real-Time Events
This guide adds an SSE events proxy endpoint to the LangChain tutorial app. For full details on event kinds, format, and connection lifecycle, see Real-Time Events.
Prerequisites
Starting checkpoint: python/examples/langchain/doc-checkpoints/03-with-history
SSE Events Proxy
Checkpoint 03b adds an events proxy endpoint that forwards SSE from Memory Service to the frontend:
app.py
from __future__ import annotations
import os
from typing import Any
import json
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import PlainTextResponse
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from memory_service_langchain import (
MemoryServiceCheckpointSaver,
MemoryServiceHistoryMiddleware,
MemoryServiceProxy,
install_fastapi_authorization_middleware,
memory_service_scope,
to_fastapi_response,
)
from starlette.responses import StreamingResponse
def extract_assistant_text(result: Any) -> str:
if not isinstance(result, dict):
return str(result)
messages = result.get("messages")
if not messages:
return str(result)
message = messages[-1]
text = getattr(message, "text", None)
if isinstance(text, str) and text:
return text
content = getattr(message, "content", "")
if isinstance(content, str):
return content
return str(content)
openai_base_url = os.getenv("OPENAI_BASE_URL")
if openai_base_url and not openai_base_url.rstrip("/").endswith("/v1"):
openai_base_url = openai_base_url.rstrip("/") + "/v1"
if openai_base_url:
os.environ.setdefault("OPENAI_API_BASE", openai_base_url)
model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o"),
openai_api_base=openai_base_url,
api_key=os.getenv("OPENAI_API_KEY", "not-needed-for-tests"),
)
checkpointer = MemoryServiceCheckpointSaver.from_env()
history_middleware = MemoryServiceHistoryMiddleware.from_env()
agent = create_agent(
model=model,
tools=[],
checkpointer=checkpointer,
middleware=[history_middleware],
system_prompt="You are a Python memory-service demo agent.",
)
app = FastAPI(title="Python LangChain Agent With Conversation History and Events")
@app.get("/ready")
async def ready() -> dict[str, str]:
return {"status": "ok"}
install_fastapi_authorization_middleware(app)
proxy = MemoryServiceProxy.from_env()
@app.post("/chat/{conversation_id}")
async def chat(conversation_id: str, request: Request) -> PlainTextResponse:
user_message = (await request.body()).decode("utf-8").strip()
if not user_message:
raise HTTPException(400, "message is required")
with memory_service_scope(conversation_id):
result = agent.invoke(
{"messages": [{"role": "user", "content": user_message}]},
{"configurable": {"thread_id": conversation_id}},
)
response_text = extract_assistant_text(result)
return PlainTextResponse(response_text)
@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
response = await proxy.get_conversation(conversation_id)
return to_fastapi_response(response)
@app.get("/v1/conversations/{conversation_id}/entries")
async def get_entries(conversation_id: str, request: Request):
response = await proxy.list_conversation_entries(
conversation_id,
after_cursor=request.query_params.get("afterCursor"),
limit=int(limit) if (limit := request.query_params.get("limit")) is not None else None,
channel="history",
)
return to_fastapi_response(response)
@app.get("/v1/conversations")
async def list_conversations(request: Request):
response = await proxy.list_conversations(
mode=request.query_params.get("mode"),
after_cursor=request.query_params.get("afterCursor"),
limit=int(limit) if (limit := request.query_params.get("limit")) is not None else None,
query=request.query_params.get("query"),
)
return to_fastapi_response(response)
# SSE events proxy — streams real-time events from Memory Service
@app.get("/v1/events")
async def events(request: Request):
kinds = request.query_params.get("kinds")
async def generate():
async for event in proxy.stream_events(kinds=kinds):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream") Why proxy? The agent app sits between the frontend and Memory Service. Proxying the SSE stream lets the app forward the caller’s Bearer token for authorization while injecting the agent’s API key for service authentication. Frontends never talk directly to Memory Service.
Connecting
Subscribe to events through the agent app:
curl -N -H "Authorization: Bearer $(get-token)" \
http://localhost:9090/v1/events
Filter to specific event kinds:
curl -N -H "Authorization: Bearer $(get-token)" \
"http://localhost:9090/v1/events?kinds=conversation,entry"
Next Steps
- Real-Time Events — full reference for event kinds, format, and connection lifecycle
- Service Configuration — event bus settings