Skip to main content

Kafka Messaging Architecture

All topics follow one of the two approved patterns.

Primary (default)

vh.<env>.<domain>.<stream>.<event>.<v#>

Extended (optional agent / tenant)

vh.<env>.<vhId>.<domain>.<stream>.<event>.<v#>

Definitions

FieldDescription
vhIdentifies the platform.
<env>The environment: dev, test, or prod.
<vhId>Optional Virtual Human identifier (for example vh42). Use only if you want separate topics per agent.
<domain>Owning area (for example voice, affect, mcp, llm, embody, context).
<stream>Type of data: raw, proc, req, res, evt, state, cmd, dlq.
<event>Short subject name (for example voice-to-text, infer, prompt, text).
<v#>Breaking topic version (for example v1).

Simple Rule

Use Primary unless you need strong isolation per agent.

If you use Extended, apply it to all topics in that environment.

Examples

Primary

vh.prod.affect.res.infer.v1

Extended

vh.prod.vh42.affect.res.infer.v1

Stream Semantics

Streams describe how data is used and guide storage settings.

StreamDescriptionRetention
rawUnprocessed inputKeep by time
procProcessed or enriched outputKeep by time
reqRequest messagesKeep by time
resResponse messagesKeep by time
evtNotifications or eventsKeep by time
stateCurrent snapshotLog compaction (keep latest per key)
cmdCommands to an actuatorKeep by time
dlqDead-letter queue for failuresKeep longer

Message Structure: Keys, Headers, Correlation

This section standardizes message metadata. It is intentionally simple.

Record Key (Partitioning)

  • Use a stable per-conversation key as the Kafka record key for conversational and state topics.
  • For dead-letter topics, use the message identifier as the record key.
  • The chosen key must also appear in the payload as a top-level field.
  • The field name is implementation-defined and not mandated by this document.

Headers

HeaderRequiredDescription
traceIdYesEnd-to-end tracing
correlationIdRequired on req/resMust match between request and response
timestampYesRFC3339 timestamp
schemaRefOptionalSchema reference
ttlMsOptionalTime-to-live
Other component headersOptionalImplementation-specific

Payload Templates

Examples are intentionally minimal. Add only fields your domain needs.

Request (.req.)

{
"message_id": "...",
"correlationId": "...",
"payload": {},
"schemaVersion": 1
}

Response (.res.)

{
"message_id": "...",
"correlationId": "...",
"ok": true,
"payload": {},
"latencyMs": 123,
"schemaVersion": 1
}

State (.state.) — Log Compacted

{
"value": {},
"updatedAt": "2025-10-01T12:34:56Z",
"schemaVersion": 1
}

Correlation

  • Every request must have a correlationId.
  • The matching response must repeat the same correlationId.
  • Systems may compute latency and should set latencyMs in responses.

Versioning Policy

  • Increase the topic suffix ...<v#> only for breaking changes.
  • Run old and new topics in parallel during migration.
  • For additive, non-breaking changes, keep the same topic and increase the payload schemaVersion.

Security and Access Control

  • Grant ACLs by domain and environment.

Example:

Producers: Write → vh.prod.affect.*
Consumers: Read → vh.prod.affect.*

Guidelines:

  • Keep environments isolated (for example dev clients must not write to prod).
  • Use least privilege.
  • Give each service access only to the topics it needs.

Monitoring and Operations

Monitor the following:

  • Produce rates
  • Consume rates
  • Consumer lag
  • Error rates
  • Response latency
  • Dead-letter queue (DLQ) volume

Set Service Level Objectives (SLOs) for request/response latency, including both median and tail latency.

Dead-Letter Queues

Dead-letter queues should include the original payload and error details to support replay.

Example:

{
"error": {
"code": "VALIDATION_ERROR",
"message": "..."
},
"original": {
"topic": "vh.dev.affect.req.infer.v1",
"headers": {},
"value": {}
},
"receivedAt": "2025-10-01T12:45:00Z"
}