Real-Time Events
Memory Service events support two different consumption models:
- Frontend cache invalidation: subscribe to the live stream, treat each event as a hint, and refetch the affected resource.
- Reliable event delivery: when the outbox is enabled and the datastore supports replay, reconnect with a durable cursor and continue from the last processed event.
Those two models use the same event kinds, but they are configured and operated differently. The biggest difference is what the consumer persists.
Choose the Right Model
| Use case | Recommended options | What the consumer stores | Recovery model |
|---|---|---|---|
| Frontend cache invalidation | GET /v1/events?detail=summary | Usually no durable event state. Store normal UI cache/query state only. | On disconnect, stream/invalidate, or uncertainty, reconnect and refetch visible data. |
| Reliable event delivery | GET /v1/events?after=<cursor>&detail=summary or detail=full | The last fully processed opaque cursor, stored durably alongside the consumer’s own projection/checkpoint state. | Handle stream/phase transitions between replay and live. If recovery falls beyond retention or the stream closes, resume with after=<cursor> or rebuild from source-of-record. |
Use the frontend model when your UI already has REST reads and just needs a push signal to invalidate cached queries.
Use the reliable model when a worker, projector, or integration must avoid missing events across reconnects. That path requires:
MEMORY_SERVICE_OUTBOX_ENABLED=true- A datastore that implements durable replay for the event outbox
- Durable storage on the consumer side for the last processed cursor
Event Format
Each SSE message contains a JSON envelope on a single data: line:
data: {"event":"created","kind":"entry","data":{"conversation":"<uuid>","conversation_group":"<uuid>","entry":"<uuid>"},"cursor":"<opaque-cursor>"}
The envelope fields are:
event: action namekind: resource typedata: kind-specific payloadcursor: opaque durable replay position when the event came from an outbox-backed stream
A keepalive comment : keepalive is sent every 30 seconds to keep the connection open.
Action Names
Business events now use normalized actions:
createdupdateddeleted
Stream-control events still use:
phaseevictedinvalidate
For response-recording lifecycle events, look at both event and data.status:
{"event":"created","kind":"response","data":{"status":"started",...}}{"event":"deleted","kind":"response","data":{"status":"completed",...}}{"event":"deleted","kind":"response","data":{"status":"failed",...}}
Detail Modes
The event stream supports two payload modes:
detail=summary: default.datacontains identifiers and a small amount of routing metadata.detail=full: the server enriches business events before sending them.
With detail=full:
conversationevents carry the full conversation payload.entryevents carry the full entry payload.responseevents are unchanged fromsummary; they keep the compactconversation,conversation_group,recording, andstatusfields.streamevents are never enriched.
For frontend cache invalidation, summary is usually the right choice because the client already knows how to refetch.
For reliable consumers, choose:
summaryif the consumer wants small events and will fetch details itself.fullif the consumer wants to project directly from enrichedconversationandentryevents and can tolerate larger payloads.
Event Kinds
This table shows the default detail=summary payload shape.
| Kind | Event | Summary data fields | Notes |
|---|---|---|---|
conversation | created | conversation, conversation_group | New conversation visible to the user |
conversation | updated | conversation, conversation_group | Title or metadata changed |
conversation | deleted | conversation, conversation_group, members | members may be included so affected users can be targeted |
entry | created | conversation, conversation_group, entry | New entry appended |
response | created | conversation, conversation_group, recording, status | status is currently started |
response | deleted | conversation, conversation_group, recording, status | status is currently completed or failed |
membership | created | conversation_group, user, role | Access granted |
membership | updated | conversation_group, user, role | Access level changed |
membership | deleted | conversation_group, user | Access removed |
stream | phase | phase | Stream phase marker. phase is replay or live. Tail-only streams start at live. Replay streams emit replay before catch-up and live after they reach the tail. |
stream | evicted | reason | Connection closed, often for slow-consumer or replacement reasons |
stream | invalidate | reason | Caller must assume events may have been missed |
Frontend Cache Invalidation
This is the existing UI-oriented pattern.
Subscribe without after and keep the handler simple:
curl -N -H "Authorization: Bearer $(get-token)" \
"http://localhost:9090/v1/events?detail=summary"
Typical frontend behavior:
- Filter with
kinds=...if you only need a subset. - On
conversation,entry,response, ormembership, invalidate the corresponding query cache and refetch. - On
stream/invalidate, do a broad cache refresh. - On disconnect or
stream/evicted, reconnect and refetch the visible data set.
In this model, the frontend usually stores no durable event position. It stores normal application state, not a replay cursor.
Reliable Event Delivery
This is the replayable consumer model for workers and downstream projections.
Start by storing the last fully processed event cursor in your own durable store. That cursor is opaque. Do not parse it, sort it, or synthesize your own.
Then subscribe with that cursor:
curl -N -H "Authorization: Bearer $(get-token)" \
"http://localhost:9090/v1/events?after=${LAST_CURSOR}&detail=full"
Consumer rules for this mode:
- Persist the new cursor only after your own side effects or projection updates succeed.
- Keep the cursor with the state it protects, so recovery is atomic from the consumer’s point of view.
- Watch for
stream/phaseevents if you need to know whether the stream is replaying historical events or has reached the live tail. - Treat
stream/invalidateas a loss-of-guarantee signal. - If the server reports that the cursor is beyond retention, rebuild from source-of-record and restart from a fresh checkpoint.
If after is rejected, the server returns an explicit not-implemented error for replay. On REST SSE that is an HTTP 501 Not Implemented response before the stream opens. On gRPC it is an Unimplemented error from SubscribeEvents. In both cases, live tail-only events may still be supported even though durable replay is not.
Subscribing with curl
The SSE endpoint is:
GET /v1/events
Accept: text/event-stream
Authorization: Bearer <token>
Optional query parameters:
kinds=conversation,entry,...filters event kinds.detail=summary|fullcontrols payload enrichment.after=<cursor>starts in replay mode and requires outbox support.
Examples:
curl -N -H "Authorization: Bearer $(get-token)" \
"http://localhost:9090/v1/events?kinds=conversation,entry"
curl -N -H "Authorization: Bearer $(get-token)" \
"http://localhost:9090/v1/events?after=${LAST_CURSOR}&detail=summary"
The same concepts also exist on gRPC EventStreamService.SubscribeEvents using after_cursor and detail.
Walkthrough
Open two terminals side by side to see summary events in real time.
Terminal 1 subscribes to the event stream:
curl -N -H "Authorization: Bearer $(get-token)" \
"http://localhost:9090/v1/events?detail=summary"
Terminal 2 creates a conversation and adds an entry:
curl -sSfX POST http://localhost:9090/chat/e2c9a1b0-0001-4000-8000-000000000001 \
-H "Content-Type: text/plain" \
-H "Authorization: Bearer $(get-token)" \
-d "Hello, this is a test."
Terminal 1 shows a conversation/created event followed by an entry/created event:
data: {"event":"phase","kind":"stream","data":{"phase":"live"}}
data: {"event":"created","kind":"conversation","data":{"conversation":"e2c9a1b0-0001-4000-8000-000000000001","conversation_group":"..."}}
data: {"event":"created","kind":"entry","data":{"conversation":"e2c9a1b0-0001-4000-8000-000000000001","conversation_group":"...","entry":"..."}}
Update the conversation title:
curl -sSfX PATCH http://localhost:9090/v1/conversations/e2c9a1b0-0001-4000-8000-000000000001 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(get-token)" \
-d '{"title":"Events demo"}'
Terminal 1 receives a conversation/updated event:
data: {"event":"updated","kind":"conversation","data":{"conversation":"e2c9a1b0-0001-4000-8000-000000000001","conversation_group":"..."}}
Testing Events
function get-token() {
curl -sSfX POST http://localhost:8081/realms/memory-service/protocol/openid-connect/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "client_id=memory-service-client" \
-d "client_secret=change-me" \
-d "grant_type=password" \
-d "username=bob" \
-d "password=bob" \
| jq -r '.access_token'
}Chat to create a conversation and trigger events:
curl -NsSfX POST http://localhost:9090/chat/e2c9a1b0-0001-4000-8000-000000000001 \
-H "Content-Type: text/plain" \
-H "Authorization: Bearer $(get-token)" \
-d "Hello, this is a test." Example output:
I am a Python memory-service demo agent. Verify the conversation was created:
curl -sSf http://localhost:9090/v1/conversations/e2c9a1b0-0001-4000-8000-000000000001 \
-H "Authorization: Bearer $(get-token)" Example output:
{
"id": "e2c9a1b0-0001-4000-8000-000000000001"
} Verify entries exist:
curl -sSf http://localhost:9090/v1/conversations/e2c9a1b0-0001-4000-8000-000000000001/entries \
-H "Authorization: Bearer $(get-token)" Example output:
{"data":[...]} Access Control
User streams are filtered by conversation membership. Callers only receive events for conversations they can read. Stream-control events bypass membership filtering because they describe the health of the subscription itself, not a specific resource.
Admin streams use /v1/admin/events, require admin or auditor access, and bypass conversation-membership filtering. A justification may be supplied for audit logging and can be required if the server enables admin justification enforcement.
Connection Lifecycle
Keepalive: a : keepalive comment is sent every 30 seconds.
Phase markers: consumers may receive stream / phase events with {"phase":"replay"} and {"phase":"live"}. Tail-only streams start with live. Replay-capable streams emit replay while catching up and live once they reach the current tail.
Connection limit: the server enforces a per-user limit on concurrent SSE connections. When a new connection is established and the user is already at the limit, the oldest local stream is evicted and receives stream / evicted with reason too many connections. The limit is configurable with MEMORY_SERVICE_SSE_MAX_CONNECTIONS_PER_USER.
Slow consumer handling: if a client falls too far behind, one of two things happens. Replay-capable SSE streams may switch back to stream / phase = replay on the same connection and catch up from the last cursor. When in-place recovery is not available, the server emits stream / evicted with reason slow consumer and closes the stream. In either case, consumers should keep their last durable cursor so they can resume if the connection is lost.
Pub/sub recovery: in multi-node deployments, Redis or PostgreSQL event-bus recovery can emit stream/invalidate with a reason such as pubsub recovery.
Retention window miss: replay consumers can also receive stream/invalidate if their stored cursor has fallen beyond outbox retention.
Frontend Integration Tips
- Keep the frontend path simple: treat events as hints, not as your only source of truth.
- Prefer
detail=summaryfor browser clients. - Only add durable cursor handling when you actually need replay guarantees.
- If you proxy the event stream through your agent app, keep the proxy transparent and let the frontend own cache invalidation behavior.
Next Steps
- See the framework-specific guides for proxying events through your agent app:
- Review Service Configuration for event bus settings and related deployment flags.