Kafka
kafkaSource
The kafkaSource function creates a new input source that receives messages from Kafka. The example below joins the consumer group consumer-group-1 and receives JSON encoded messages from topics topic1, topic2 and topic3.
Application.create()
.input()
.add(kafkaSource({
broker: "my-kafka-broker:9092",
encoder: new JsonMessageEncoder(),
group: "consumer-group-1",
topics: "topic1, topic2, topic3",
}))
.done()
// ...
.run();
Configuration
The available configuration options are
| name | description |
|---|---|
| broker | the DNS name including port of one Kafka broker to connect to |
| encoder | defines how the raw data received from Kafka should be converted into message objects |
| group | the name of the consumer group to join |
| topics | a list of topics to consume from |
| offsetCommitInterval | defines how often offsets will be committed to Kafka, the default values is 5 seconds |
| consumeTimeout | sets max.poll.interval.ms, the default value is 50ms |
| maxBytesPerPartition | sets max.partition.fetch.bytes, the default value is 10MiB |
| sessionTimeout | sets session.timeout.ms, the default value is 30000ms (30s) |
| preprocessor | optional preprocessing function that allows for manipulation of the incoming message before decoding the message's payload. This can for instance be used for messages that are enveloped in some way |
OffsetResetStrategy
Each topic can be configured with a OffsetResetStrategy that determines at which point in the message stream to start consuming
| name | description |
|---|---|
| Earliest | if no stored offsets are available for the consumer group, it will start consuming from the earliest message available on the Kafka broker |
| Latest | if no stored offsets are available for the consumer group, it will start consuming from the latest message available on the Kafka broker |
| AlwaysEarliest | ignores stored offsets and will start consuming from the earliest message available on the Kafka broker |
| AlwaysLatest | ignores stored offsets and will start consuming from the latest message available on the Kafka broker |
The OffsetResetStrategy can be configured along with the topics like this "topic1|earliest, topic2|always-latest", etc.
Metadata
The following metadata is available in the message handler via ctx.metadata<T>(key)
| name | description |
|---|---|
| KafkaMetadata.Topic | the name of the topic the messages was received from |
| KafkaMetadata.Offset | the offset of the message on its topic/partition |
| KafkaMetadata.Partition | the partition number the message belongs to |
| KafkaMetadata.Key | the key associated with the message |
| KafkaMetadata.Timestamp | the date/time when the broker originally received the message |
Metrics
| Name | Description | Type | Tags |
|---|---|---|---|
| cookie_cutter.kafka_consumer.request_queue_size | the number of pending network requests to brokers | gauge | topic, partition |
| cookie_cutter.kafka_consumer.incoming_batch_size | number of messages received from broker in a single fetch request | gauge | |
| cookie_cutter.kafka_consumer.offset_committed | the last offset that was committed to the broker | gauge | topic, partition |
| cookie_cutter.kafka_consumer.offset_high_watermark | the current high watermark offset | gauge | topic, partition |
| cookie_cutter.kafka_consumer.offset_low_watermark | the current low watermark offset | gauge | topic, partition |
| cookie_cutter.kafka_consumer.lag | the delta between high watermark offset and committed offset | gauge | topic, partition |
kafkaSink
The 'kafkaSink' function creates an output sink that handles published messages.
Application.create()
.output()
.published(kafkaSink({
broker: "my-kafka-broker:9092",
encoder: new JsonMessageEncoder(),
defaultTopic: "topic1",
}))
.done()
// ...
.run();
Configuration
The available configuration options are
| name | description |
|---|---|
| broker | the DNS name including port of one Kafka broker to connect to |
| encoder | defines how the raw data received from Kafka should be converted into message objects |
| defaultTopic | the name of the topic to publish to if no other topic was specified in the message handler |
Publishing to Kafka
Application.create()
.dispatch({
onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
// publish a message to the default topic
ctx.publish(Output, {
field: "value",
});
// publish a message to a different topic
ctx.publish(Output, {
field: "value",
}, { [KafkaMetadata.Topic]: "my-topic" });
// explicitly set the key (recommended to always do that)
ctx.publish(Output, {
field: "value",
}, { [KafkaMetadata.Key]: "xyz" });
}
})
.output()
.published(kafkaSink({
broker: "my-kafka-broker:9092",
encoder: new JsonMessageEncoder(),
defaultTopic: "topic1",
}))
.done()
// ...
.run();
Metadata
| name | description |
|---|---|
| KafkaMetadata.Topic | the name of the topic to publish to |
| KafkaMetadata.Key | the key to associate with the message, by default it will use the input message's EventSourcedMetadata.Stream if available, or null (=round robin assignment to partitions) otherwise |
Metrics
| Name | Description | Type | Tags |
|---|---|---|---|
| cookie_cutter.kafka_producer.msg_published | the number of messages sent to brokers | increment | topic, event_type, partition, result |