LangGraph Real-Time Events

This guide adds an SSE events proxy endpoint to the LangGraph tutorial app. For full details on event kinds, format, and connection lifecycle, see Real-Time Events.

Prerequisites

Starting checkpoint: python/examples/langgraph/doc-checkpoints/03-with-history

SSE Events Proxy

Checkpoint 03b adds an events proxy endpoint that forwards SSE from Memory Service to the frontend:

app.py
from __future__ import annotations

import json
import os

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import PlainTextResponse
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph
from langgraph.graph.message import MessagesState
from memory_service_langchain import (
    MemoryServiceCheckpointSaver,
    MemoryServiceHistoryMiddleware,
    MemoryServiceProxy,
    install_fastapi_authorization_middleware,
    memory_service_scope,
    to_fastapi_response,
)
from starlette.responses import StreamingResponse


openai_base_url = os.getenv("OPENAI_BASE_URL")
if openai_base_url and not openai_base_url.rstrip("/").endswith("/v1"):
    openai_base_url = openai_base_url.rstrip("/") + "/v1"

model = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o"),
    openai_api_base=openai_base_url,
    api_key=os.getenv("OPENAI_API_KEY", "not-needed-for-tests"),
)

checkpointer = MemoryServiceCheckpointSaver.from_env()
history_middleware = MemoryServiceHistoryMiddleware.from_env()


def call_model(state: MessagesState) -> dict:
    messages = [{"role": "system", "content": "You are a helpful assistant."}] + list(state["messages"])
    user_text = state["messages"][-1].content
    response = history_middleware.wrap_model_call(user_text, lambda: model.invoke(messages))
    return {"messages": [response]}


builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)

app = FastAPI(title="LangGraph Chatbot with Conversation History and Events")


@app.get("/ready")
async def ready() -> dict[str, str]:
    return {"status": "ok"}
install_fastapi_authorization_middleware(app)
proxy = MemoryServiceProxy.from_env()


@app.post("/chat/{conversation_id}")
async def chat(conversation_id: str, request: Request) -> PlainTextResponse:
    user_message = (await request.body()).decode("utf-8").strip()
    if not user_message:
        raise HTTPException(400, "message is required")

    with memory_service_scope(conversation_id):
        result = await graph.ainvoke(
            {"messages": [{"role": "user", "content": user_message}]},
            config={"configurable": {"thread_id": conversation_id}},
        )

    message = result["messages"][-1]
    content = getattr(message, "content", "")
    return PlainTextResponse(content if isinstance(content, str) else str(content))


@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
    response = await proxy.get_conversation(conversation_id)
    return to_fastapi_response(response)


@app.get("/v1/conversations/{conversation_id}/entries")
async def get_entries(conversation_id: str, request: Request):
    response = await proxy.list_conversation_entries(
        conversation_id,
        after_cursor=request.query_params.get("afterCursor"),
        limit=int(limit) if (limit := request.query_params.get("limit")) is not None else None,
        channel="history",
    )
    return to_fastapi_response(response)


@app.get("/v1/conversations")
async def list_conversations(request: Request):
    response = await proxy.list_conversations(
        mode=request.query_params.get("mode"),
        after_cursor=request.query_params.get("afterCursor"),
        limit=int(limit) if (limit := request.query_params.get("limit")) is not None else None,
        query=request.query_params.get("query"),
    )
    return to_fastapi_response(response)


# SSE events proxy — streams real-time events from Memory Service
@app.get("/v1/events")
async def events(request: Request):
    kinds = request.query_params.get("kinds")

    async def generate():
        async for event in proxy.stream_events(kinds=kinds):
            yield f"data: {json.dumps(event)}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Why proxy? The agent app sits between the frontend and Memory Service. Proxying the SSE stream lets the app forward the caller’s Bearer token for authorization while injecting the agent’s API key for service authentication. Frontends never talk directly to Memory Service.

Connecting

Subscribe to events through the agent app:

curl -N -H "Authorization: Bearer $(get-token)" \
  http://localhost:9090/v1/events

Filter to specific event kinds:

curl -N -H "Authorization: Bearer $(get-token)" \
  "http://localhost:9090/v1/events?kinds=conversation,entry"

Next Steps