LangChain Real-Time Events

This guide adds an SSE events proxy endpoint to the LangChain tutorial app. For full details on event kinds, format, and connection lifecycle, see Real-Time Events.

Prerequisites

Starting checkpoint: python/examples/langchain/doc-checkpoints/03-with-history

SSE Events Proxy

Checkpoint 03b adds an events proxy endpoint that forwards SSE from Memory Service to the frontend:

app.py
from __future__ import annotations

import os
from typing import Any

import json

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import PlainTextResponse
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from memory_service_langchain import (
    MemoryServiceCheckpointSaver,
    MemoryServiceHistoryMiddleware,
    MemoryServiceProxy,
    install_fastapi_authorization_middleware,
    memory_service_scope,
    to_fastapi_response,
)
from starlette.responses import StreamingResponse


def extract_assistant_text(result: Any) -> str:
    if not isinstance(result, dict):
        return str(result)

    messages = result.get("messages")
    if not messages:
        return str(result)

    message = messages[-1]

    text = getattr(message, "text", None)
    if isinstance(text, str) and text:
        return text

    content = getattr(message, "content", "")
    if isinstance(content, str):
        return content

    return str(content)


openai_base_url = os.getenv("OPENAI_BASE_URL")
if openai_base_url and not openai_base_url.rstrip("/").endswith("/v1"):
    openai_base_url = openai_base_url.rstrip("/") + "/v1"
if openai_base_url:
    os.environ.setdefault("OPENAI_API_BASE", openai_base_url)

model = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o"),
    openai_api_base=openai_base_url,
    api_key=os.getenv("OPENAI_API_KEY", "not-needed-for-tests"),
)

checkpointer = MemoryServiceCheckpointSaver.from_env()
history_middleware = MemoryServiceHistoryMiddleware.from_env()

agent = create_agent(
    model=model,
    tools=[],
    checkpointer=checkpointer,
    middleware=[history_middleware],
    system_prompt="You are a Python memory-service demo agent.",
)

app = FastAPI(title="Python LangChain Agent With Conversation History and Events")


@app.get("/ready")
async def ready() -> dict[str, str]:
    return {"status": "ok"}
install_fastapi_authorization_middleware(app)
proxy = MemoryServiceProxy.from_env()

@app.post("/chat/{conversation_id}")
async def chat(conversation_id: str, request: Request) -> PlainTextResponse:
    user_message = (await request.body()).decode("utf-8").strip()
    if not user_message:
        raise HTTPException(400, "message is required")

    with memory_service_scope(conversation_id):
        result = agent.invoke(
            {"messages": [{"role": "user", "content": user_message}]},
            {"configurable": {"thread_id": conversation_id}},
        )

    response_text = extract_assistant_text(result)
    return PlainTextResponse(response_text)

@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
    response = await proxy.get_conversation(conversation_id)
    return to_fastapi_response(response)


@app.get("/v1/conversations/{conversation_id}/entries")
async def get_entries(conversation_id: str, request: Request):
    response = await proxy.list_conversation_entries(
        conversation_id,
        after_cursor=request.query_params.get("afterCursor"),
        limit=int(limit) if (limit := request.query_params.get("limit")) is not None else None,
        channel="history",
    )
    return to_fastapi_response(response)


@app.get("/v1/conversations")
async def list_conversations(request: Request):
    response = await proxy.list_conversations(
        mode=request.query_params.get("mode"),
        after_cursor=request.query_params.get("afterCursor"),
        limit=int(limit) if (limit := request.query_params.get("limit")) is not None else None,
        query=request.query_params.get("query"),
    )
    return to_fastapi_response(response)


# SSE events proxy — streams real-time events from Memory Service
@app.get("/v1/events")
async def events(request: Request):
    kinds = request.query_params.get("kinds")

    async def generate():
        async for event in proxy.stream_events(kinds=kinds):
            yield f"data: {json.dumps(event)}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Why proxy? The agent app sits between the frontend and Memory Service. Proxying the SSE stream lets the app forward the caller’s Bearer token for authorization while injecting the agent’s API key for service authentication. Frontends never talk directly to Memory Service.

Connecting

Subscribe to events through the agent app:

curl -N -H "Authorization: Bearer $(get-token)" \
  http://localhost:9090/v1/events

Filter to specific event kinds:

curl -N -H "Authorization: Bearer $(get-token)" \
  "http://localhost:9090/v1/events?kinds=conversation,entry"

Next Steps