Apache Kafka
Server-side event streaming to Apache Kafka via the kafkajs client. Each event is serialized as JSON, keyed for partition-friendly ordering, and produced to a configurable topic. Supports SASL/SSL authentication (Confluent Cloud, AWS MSK, SCRAM), configurable compression (gzip, snappy, lz4, zstd), per-rule topic and key overrides, and graceful shutdown via destroy().
Kafka is a server destination in the walkerOS flow:
Receives events server-side from the collector, serializes them as JSON with a partition-friendly message key, and produces them to a Kafka topic for downstream stream processing (Flink, Spark, ksqlDB, Kafka Connect, consumers).
Installation
- Integrated
- Bundled
Configuration
This destination uses the standard destination config wrapper (consent, data, env, id, ...). For the shared fields see destination configuration. Package-specific fields live under config.settings and are listed below.
Settings
| Property | Type | Description | More |
|---|---|---|---|
kafka | kafka | Kafka connection and producer settings. | |
brokers | Array<string> | Kafka broker addresses (host:port). At least one required. | |
clientId | string | Kafka client ID. Default: walkeros. | |
ssl | boolean | object | TLS configuration. Set true for default TLS, or provide a tls.ConnectionOptions object for mTLS. | |
sasl | sasl | SASL authentication config. Required for Confluent Cloud, AWS MSK with IAM, etc. | |
mechanism | SASL authentication mechanism. | ||
username | string | Username for plain/scram mechanisms. | |
password | string | Password for plain/scram mechanisms. | |
accessKeyId | string | AWS access key ID for IAM auth (mechanism: aws). | |
secretAccessKey | string | AWS secret access key for IAM auth (mechanism: aws). | |
sessionToken | string | AWS session token for temporary credentials (mechanism: aws). | |
authorizationIdentity | string | AWS authorization identity (mechanism: aws). | |
connectionTimeout | integer | Connection timeout in ms. Default: 1000. | |
requestTimeout | integer | Request timeout in ms. Default: 30000. | |
topic | string | Target Kafka topic name. | |
acks | integer | Acknowledgement level. -1 = all replicas, 0 = fire-and-forget, 1 = leader only. Default: -1. | |
timeout | integer | Broker response timeout in ms. Default: 30000. | |
compression | Message compression codec. Default: gzip. Snappy/LZ4/ZSTD require additional npm packages. | ||
idempotent | boolean | Enable idempotent producer for exactly-once delivery. Default: false. | |
allowAutoTopicCreation | boolean | Allow auto-creation of topics on the broker. Default: false. | |
key | string | Mapping value path for message key derivation (e.g. user.id, data.userId). Default: entity_action. | |
headers | Record<string, string> | Static headers added to every message. | |
retry | retry | Retry configuration for transient failures. | |
maxRetryTime | integer | Max total retry wait in ms. Default: 30000. | |
initialRetryTime | integer | First retry delay in ms. Default: 300. | |
retries | integer | Max retry count. Default: 5. |
Mapping
Per-event rules under config.mapping. For the standard rule fields (consent, condition, data, batch, name, policy) see mapping.
| Property | Type | Description | More |
|---|---|---|---|
key | string | Override message key mapping path for this rule (e.g. data.id). Takes precedence over settings.kafka.key. | |
topic | string | Override Kafka topic for this rule. Takes precedence over settings.kafka.topic. |
Examples
Default event
An event is produced to the configured Kafka topic with the full JSON body and entity_action as the message key.
Key from user id
A settings.kafka.key path resolves the message key from the event, here using user.id for per-user partitioning.
Mapped payload
A data mapping transforms the event payload before producing it as the Kafka message value.
Renamed event
A mapping renames the event which also changes the default Kafka message key used for partitioning.
Topic override
A mapping rule overrides the destination topic so specific events are routed to a dedicated stream.
The destination creates a single long-lived kafkajs producer during init() and calls producer.connect() before accepting events. On flow hot-swap or server shutdown, destroy() calls producer.disconnect() to flush in-flight messages and close TCP connections.
Message format
Events are serialized as JSON and produced with the following structure:
- topic, from
settings.kafka.topic(ormapping.settings.topicoverride) - key, resolved from
settings.kafka.key(ormapping.settings.keyoverride) mapping path; defaults to the event name with spaces replaced by_(e.g.page_view,order_complete) for partition-based ordering - value, mapped payload (when
data.mapis configured) or the full walkerOS event,JSON.stringify()-ed - headers,
content-type: application/jsonplus any staticsettings.kafka.headers - timestamp, event timestamp as string (ms since epoch)
Use mapping.settings.topic to route specific events to dedicated topics (e.g. orders to orders-stream, identities to identity-stream). Use mapping.settings.key to set a key path per rule (e.g. data.order_id for order events).
Setup
Kafka topic creation requires explicit numPartitions and replicationFactor.
There is no universally correct default for either: replicationFactor must be
less than or equal to broker count, and numPartitions is a function of
expected throughput and consumer parallelism. The boolean form setup: true is
rejected with an error listing the required fields. Only the object form is
valid.
Provision a topic once per environment with the CLI:
Output: setup: ok destination.kafka plus a JSON line reporting
{ topicCreated, schemaRegistered }. The command is idempotent, safe to
re-run. Drift on numPartitions, replicationFactor, or configEntries is
logged as WARN setup.drift {...} and never auto-mutates the broker.
config.setup:
false(default): no provisioning. Operator must run setup explicitly.true: rejected at runtime with an actionable error. There are no safe defaults for partition count or replication factor.{ numPartitions, replicationFactor, ... }: object form is the only valid form. See theSetupinterface in the package for full options.
Required fields
| Field | Type | Notes |
|---|---|---|
numPartitions | number | Required at runtime. No safe default. |
replicationFactor | number | Required at runtime. Must be <= broker count. |
topic | string | Falls back to settings.kafka.topic when omitted. |
Optional fields
| Field | Type | Notes |
|---|---|---|
configEntries | object | Topic-level config, e.g. { "retention.ms": "604800000" }. |
schemaRegistry | object | Confluent Schema Registry binding (see below). |
validateOnly | boolean | kafkajs broker-side dry-run. No topic is created. |
Example
Schema Registry (optional)
The schema is registered via the Confluent Schema Registry REST API. The
optional compatibility level is set on the subject after registration.
Runtime error when the topic is missing
When setup was not run and the topic does not exist on the cluster, push()
catches the kafkajs UNKNOWN_TOPIC_OR_PARTITION error and logs an actionable
message pointing the operator at walkeros setup destination.<id>. Run setup
with explicit numPartitions and replicationFactor to provision the topic.