Real-Time Events

Memory Service events support two different consumption models:

  • Frontend cache invalidation: subscribe to the live stream, treat each event as a hint, and refetch the affected resource.
  • Reliable event delivery: when the outbox is enabled and the datastore supports replay, reconnect with a durable cursor and continue from the last processed event.

Those two models use the same event kinds, but they are configured and operated differently. The biggest difference is what the consumer persists.

Choose the Right Model

Use caseRecommended optionsWhat the consumer storesRecovery model
Frontend cache invalidationGET /v1/events?detail=summaryUsually no durable event state. Store normal UI cache/query state only.On disconnect, stream/invalidate, or uncertainty, reconnect and refetch visible data.
Reliable event deliveryGET /v1/events?after=<cursor>&detail=summary or detail=fullThe last fully processed opaque cursor, stored durably alongside the consumer’s own projection/checkpoint state.Handle stream/phase transitions between replay and live. If recovery falls beyond retention or the stream closes, resume with after=<cursor> or rebuild from source-of-record.

Use the frontend model when your UI already has REST reads and just needs a push signal to invalidate cached queries.

Use the reliable model when a worker, projector, or integration must avoid missing events across reconnects. That path requires:

  • MEMORY_SERVICE_OUTBOX_ENABLED=true
  • A datastore that implements durable replay for the event outbox
  • Durable storage on the consumer side for the last processed cursor

Event Format

Each SSE message contains a JSON envelope on a single data: line:

data: {"event":"created","kind":"entry","data":{"conversation":"<uuid>","conversation_group":"<uuid>","entry":"<uuid>"},"cursor":"<opaque-cursor>"}

The envelope fields are:

  • event: action name
  • kind: resource type
  • data: kind-specific payload
  • cursor: opaque durable replay position when the event came from an outbox-backed stream

A keepalive comment : keepalive is sent every 30 seconds to keep the connection open.

Action Names

Business events now use normalized actions:

  • created
  • updated
  • deleted

Stream-control events still use:

  • phase
  • evicted
  • invalidate

For response-recording lifecycle events, look at both event and data.status:

  • {"event":"created","kind":"response","data":{"status":"started",...}}
  • {"event":"deleted","kind":"response","data":{"status":"completed",...}}
  • {"event":"deleted","kind":"response","data":{"status":"failed",...}}

Detail Modes

The event stream supports two payload modes:

  • detail=summary: default. data contains identifiers and a small amount of routing metadata.
  • detail=full: the server enriches business events before sending them.

With detail=full:

  • conversation events carry the full conversation payload.
  • entry events carry the full entry payload.
  • response events are unchanged from summary; they keep the compact conversation, conversation_group, recording, and status fields.
  • stream events are never enriched.

For frontend cache invalidation, summary is usually the right choice because the client already knows how to refetch.

For reliable consumers, choose:

  • summary if the consumer wants small events and will fetch details itself.
  • full if the consumer wants to project directly from enriched conversation and entry events and can tolerate larger payloads.

Event Kinds

This table shows the default detail=summary payload shape.

KindEventSummary data fieldsNotes
conversationcreatedconversation, conversation_groupNew conversation visible to the user
conversationupdatedconversation, conversation_groupTitle or metadata changed
conversationdeletedconversation, conversation_group, membersmembers may be included so affected users can be targeted
entrycreatedconversation, conversation_group, entryNew entry appended
responsecreatedconversation, conversation_group, recording, statusstatus is currently started
responsedeletedconversation, conversation_group, recording, statusstatus is currently completed or failed
membershipcreatedconversation_group, user, roleAccess granted
membershipupdatedconversation_group, user, roleAccess level changed
membershipdeletedconversation_group, userAccess removed
streamphasephaseStream phase marker. phase is replay or live. Tail-only streams start at live. Replay streams emit replay before catch-up and live after they reach the tail.
streamevictedreasonConnection closed, often for slow-consumer or replacement reasons
streaminvalidatereasonCaller must assume events may have been missed

Frontend Cache Invalidation

This is the existing UI-oriented pattern.

Subscribe without after and keep the handler simple:

curl -N -H "Authorization: Bearer $(get-token)" \
  "http://localhost:9090/v1/events?detail=summary"

Typical frontend behavior:

  • Filter with kinds=... if you only need a subset.
  • On conversation, entry, response, or membership, invalidate the corresponding query cache and refetch.
  • On stream/invalidate, do a broad cache refresh.
  • On disconnect or stream/evicted, reconnect and refetch the visible data set.

In this model, the frontend usually stores no durable event position. It stores normal application state, not a replay cursor.

Reliable Event Delivery

This is the replayable consumer model for workers and downstream projections.

Start by storing the last fully processed event cursor in your own durable store. That cursor is opaque. Do not parse it, sort it, or synthesize your own.

Then subscribe with that cursor:

curl -N -H "Authorization: Bearer $(get-token)" \
  "http://localhost:9090/v1/events?after=${LAST_CURSOR}&detail=full"

Consumer rules for this mode:

  • Persist the new cursor only after your own side effects or projection updates succeed.
  • Keep the cursor with the state it protects, so recovery is atomic from the consumer’s point of view.
  • Watch for stream / phase events if you need to know whether the stream is replaying historical events or has reached the live tail.
  • Treat stream/invalidate as a loss-of-guarantee signal.
  • If the server reports that the cursor is beyond retention, rebuild from source-of-record and restart from a fresh checkpoint.

If after is rejected, the server returns an explicit not-implemented error for replay. On REST SSE that is an HTTP 501 Not Implemented response before the stream opens. On gRPC it is an Unimplemented error from SubscribeEvents. In both cases, live tail-only events may still be supported even though durable replay is not.

Subscribing with curl

The SSE endpoint is:

GET /v1/events
Accept: text/event-stream
Authorization: Bearer <token>

Optional query parameters:

  • kinds=conversation,entry,... filters event kinds.
  • detail=summary|full controls payload enrichment.
  • after=<cursor> starts in replay mode and requires outbox support.

Examples:

curl -N -H "Authorization: Bearer $(get-token)" \
  "http://localhost:9090/v1/events?kinds=conversation,entry"
curl -N -H "Authorization: Bearer $(get-token)" \
  "http://localhost:9090/v1/events?after=${LAST_CURSOR}&detail=summary"

The same concepts also exist on gRPC EventStreamService.SubscribeEvents using after_cursor and detail.

Walkthrough

Open two terminals side by side to see summary events in real time.

Terminal 1 subscribes to the event stream:

curl -N -H "Authorization: Bearer $(get-token)" \
  "http://localhost:9090/v1/events?detail=summary"

Terminal 2 creates a conversation and adds an entry:

curl -sSfX POST http://localhost:9090/chat/e2c9a1b0-0001-4000-8000-000000000001 \
  -H "Content-Type: text/plain" \
  -H "Authorization: Bearer $(get-token)" \
  -d "Hello, this is a test."

Terminal 1 shows a conversation/created event followed by an entry/created event:

data: {"event":"phase","kind":"stream","data":{"phase":"live"}}
data: {"event":"created","kind":"conversation","data":{"conversation":"e2c9a1b0-0001-4000-8000-000000000001","conversation_group":"..."}}
data: {"event":"created","kind":"entry","data":{"conversation":"e2c9a1b0-0001-4000-8000-000000000001","conversation_group":"...","entry":"..."}}

Update the conversation title:

curl -sSfX PATCH http://localhost:9090/v1/conversations/e2c9a1b0-0001-4000-8000-000000000001 \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $(get-token)" \
  -d '{"title":"Events demo"}'

Terminal 1 receives a conversation/updated event:

data: {"event":"updated","kind":"conversation","data":{"conversation":"e2c9a1b0-0001-4000-8000-000000000001","conversation_group":"..."}}

Testing Events

function get-token() {
  curl -sSfX POST http://localhost:8081/realms/memory-service/protocol/openid-connect/token \
    -H "Content-Type: application/x-www-form-urlencoded" \
    -d "client_id=memory-service-client" \
    -d "client_secret=change-me" \
    -d "grant_type=password" \
    -d "username=bob" \
    -d "password=bob" \
    | jq -r '.access_token'
}

Chat to create a conversation and trigger events:

curl -NsSfX POST http://localhost:9090/chat/e2c9a1b0-0001-4000-8000-000000000001 \
  -H "Content-Type: text/plain" \
  -H "Authorization: Bearer $(get-token)" \
  -d "Hello, this is a test."

Example output:

I am a Python memory-service demo agent.

Verify the conversation was created:

curl -sSf http://localhost:9090/v1/conversations/e2c9a1b0-0001-4000-8000-000000000001 \
  -H "Authorization: Bearer $(get-token)"

Example output:

{
  "id": "e2c9a1b0-0001-4000-8000-000000000001"
}

Verify entries exist:

curl -sSf http://localhost:9090/v1/conversations/e2c9a1b0-0001-4000-8000-000000000001/entries \
  -H "Authorization: Bearer $(get-token)"

Example output:

{"data":[...]}

Access Control

User streams are filtered by conversation membership. Callers only receive events for conversations they can read. Stream-control events bypass membership filtering because they describe the health of the subscription itself, not a specific resource.

Admin streams use /v1/admin/events, require admin or auditor access, and bypass conversation-membership filtering. A justification may be supplied for audit logging and can be required if the server enables admin justification enforcement.

Connection Lifecycle

Keepalive: a : keepalive comment is sent every 30 seconds.

Phase markers: consumers may receive stream / phase events with {"phase":"replay"} and {"phase":"live"}. Tail-only streams start with live. Replay-capable streams emit replay while catching up and live once they reach the current tail.

Connection limit: the server enforces a per-user limit on concurrent SSE connections. When a new connection is established and the user is already at the limit, the oldest local stream is evicted and receives stream / evicted with reason too many connections. The limit is configurable with MEMORY_SERVICE_SSE_MAX_CONNECTIONS_PER_USER.

Slow consumer handling: if a client falls too far behind, one of two things happens. Replay-capable SSE streams may switch back to stream / phase = replay on the same connection and catch up from the last cursor. When in-place recovery is not available, the server emits stream / evicted with reason slow consumer and closes the stream. In either case, consumers should keep their last durable cursor so they can resume if the connection is lost.

Pub/sub recovery: in multi-node deployments, Redis or PostgreSQL event-bus recovery can emit stream/invalidate with a reason such as pubsub recovery.

Retention window miss: replay consumers can also receive stream/invalidate if their stored cursor has fallen beyond outbox retention.

Frontend Integration Tips

  • Keep the frontend path simple: treat events as hints, not as your only source of truth.
  • Prefer detail=summary for browser clients.
  • Only add durable cursor handling when you actually need replay guarantees.
  • If you proxy the event stream through your agent app, keep the proxy transparent and let the frontend own cache invalidation behavior.

Next Steps