Apache Kafka
Server-side event streaming to Apache Kafka via the kafkajs client. Each event is serialized as JSON, keyed for partition-friendly ordering, and produced to a configurable topic. Supports SASL/SSL authentication (Confluent Cloud, AWS MSK, SCRAM), configurable compression (gzip, snappy, lz4, zstd), per-rule topic and key overrides, and graceful shutdown via destroy().
Kafka is a server destination in the walkerOS flow:
Receives events server-side from the collector, serializes them as JSON with a partition-friendly message key, and produces them to a Kafka topic for downstream stream processing (Flink, Spark, ksqlDB, Kafka Connect, consumers).
Installation
- Integrated
- Bundled
Configuration
This destination uses the standard destination config wrapper (consent, data, env, id, ...). For the shared fields see destination configuration. Package-specific fields live under config.settings and are listed below.
Settings
| Property | Type | Description | More |
|---|---|---|---|
kafka | object | Kafka connection and producer settings. |
Mapping
Per-event rules under config.mapping. For the standard rule fields (consent, condition, data, batch, name, policy) see mapping.
| Property | Type | Description | More |
|---|---|---|---|
key | string | Override message key mapping path for this rule (e.g. data.id). Takes precedence over settings.kafka.key. | |
topic | string | Override Kafka topic for this rule. Takes precedence over settings.kafka.topic. |
Examples
default event
ignored event
key from user
mapped data
mapped event name
topic override
The destination creates a single long-lived kafkajs producer during init() and calls producer.connect() before accepting events. On flow hot-swap or server shutdown, destroy() calls producer.disconnect() to flush in-flight messages and close TCP connections.
Message format
Events are serialized as JSON and produced with the following structure:
- topic, from
settings.kafka.topic(ormapping.settings.topicoverride) - key, resolved from
settings.kafka.key(ormapping.settings.keyoverride) mapping path; defaults to the event name with spaces replaced by_(e.g.page_view,order_complete) for partition-based ordering - value, mapped payload (when
data.mapis configured) or the full walkerOS event,JSON.stringify()-ed - headers,
content-type: application/jsonplus any staticsettings.kafka.headers - timestamp, event timestamp as string (ms since epoch)
Use mapping.settings.topic to route specific events to dedicated topics (e.g. orders to orders-stream, identities to identity-stream). Use mapping.settings.key to set a key path per rule (e.g. data.order_id for order events).