Cookie Cutter

Cookie Cutter

  • Introduction
  • API
  • Help

›Modules

Introduction

  • Getting Started
  • Inputs
  • Message Handling
  • Outputs
  • Versioning and Contribution Guide

Components

  • Dispatch Context
  • State
  • Metrics
  • Tracing
  • Logging
  • Validation
  • Encoding
  • Config
  • Testing

Modules

  • Kafka
  • Azure
  • AMQP
  • gRPC
  • ValidateJS
  • MSSQL
  • Timer
  • StatsD
  • Protobuf
  • Prometheus
  • Redis
  • S3
  • Google PubSub

Kafka

kafkaSource

The kafkaSource function creates a new input source that receives messages from Kafka. The example below joins the consumer group consumer-group-1 and receives JSON encoded messages from topics topic1, topic2 and topic3.

Application.create()
    .input()
        .add(kafkaSource({
            broker: "my-kafka-broker:9092",
            encoder: new JsonMessageEncoder(),
            group: "consumer-group-1",
            topics: "topic1, topic2, topic3",
        }))
        .done()
    // ...
    .run();

Configuration

The available configuration options are

namedescription
brokerthe DNS name including port of one Kafka broker to connect to
encoderdefines how the raw data received from Kafka should be converted into message objects
groupthe name of the consumer group to join
topicsa list of topics to consume from
offsetCommitIntervaldefines how often offsets will be committed to Kafka, the default values is 5 seconds
consumeTimeoutsets max.poll.interval.ms, the default value is 50ms
maxBytesPerPartitionsets max.partition.fetch.bytes, the default value is 10MiB
sessionTimeoutsets session.timeout.ms, the default value is 30000ms (30s)
preprocessoroptional preprocessing function that allows for manipulation of the incoming message before decoding the message's payload. This can for instance be used for messages that are enveloped in some way

OffsetResetStrategy

Each topic can be configured with a OffsetResetStrategy that determines at which point in the message stream to start consuming

namedescription
Earliestif no stored offsets are available for the consumer group, it will start consuming from the earliest message available on the Kafka broker
Latestif no stored offsets are available for the consumer group, it will start consuming from the latest message available on the Kafka broker
AlwaysEarliestignores stored offsets and will start consuming from the earliest message available on the Kafka broker
AlwaysLatestignores stored offsets and will start consuming from the latest message available on the Kafka broker

The OffsetResetStrategy can be configured along with the topics like this "topic1|earliest, topic2|always-latest", etc.

Metadata

The following metadata is available in the message handler via ctx.metadata<T>(key)

namedescription
KafkaMetadata.Topicthe name of the topic the messages was received from
KafkaMetadata.Offsetthe offset of the message on its topic/partition
KafkaMetadata.Partitionthe partition number the message belongs to
KafkaMetadata.Keythe key associated with the message
KafkaMetadata.Timestampthe date/time when the broker originally received the message

Metrics

NameDescriptionTypeTags
cookie_cutter.kafka_consumer.request_queue_sizethe number of pending network requests to brokersgaugetopic, partition
cookie_cutter.kafka_consumer.incoming_batch_sizenumber of messages received from broker in a single fetch requestgauge
cookie_cutter.kafka_consumer.offset_committedthe last offset that was committed to the brokergaugetopic, partition
cookie_cutter.kafka_consumer.offset_high_watermarkthe current high watermark offsetgaugetopic, partition
cookie_cutter.kafka_consumer.offset_low_watermarkthe current low watermark offsetgaugetopic, partition
cookie_cutter.kafka_consumer.lagthe delta between high watermark offset and committed offsetgaugetopic, partition

kafkaSink

The 'kafkaSink' function creates an output sink that handles published messages.

Application.create()
    .output()
        .published(kafkaSink({
            broker: "my-kafka-broker:9092",
            encoder: new JsonMessageEncoder(),
            defaultTopic: "topic1",
        }))
        .done()
    // ...
    .run();

Configuration

The available configuration options are

namedescription
brokerthe DNS name including port of one Kafka broker to connect to
encoderdefines how the raw data received from Kafka should be converted into message objects
defaultTopicthe name of the topic to publish to if no other topic was specified in the message handler

Publishing to Kafka

Application.create()
    .dispatch({
        onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
            // publish a message to the default topic
            ctx.publish(Output, {
                field: "value",
            });

            // publish a message to a different topic
            ctx.publish(Output, {
                field: "value",
            }, { [KafkaMetadata.Topic]: "my-topic" });

            // explicitly set the key (recommended to always do that)
            ctx.publish(Output, {
                field: "value",
            }, { [KafkaMetadata.Key]: "xyz" });
        }
    })
    .output()
        .published(kafkaSink({
            broker: "my-kafka-broker:9092",
            encoder: new JsonMessageEncoder(),
            defaultTopic: "topic1",
        }))
        .done()
    // ...
    .run();

Metadata

namedescription
KafkaMetadata.Topicthe name of the topic to publish to
KafkaMetadata.Keythe key to associate with the message, by default it will use the input message's EventSourcedMetadata.Stream if available, or null (=round robin assignment to partitions) otherwise

Metrics

NameDescriptionTypeTags
cookie_cutter.kafka_producer.msg_publishedthe number of messages sent to brokersincrementtopic, event_type, partition, result
← TestingAzure →
  • kafkaSource
    • Configuration
    • OffsetResetStrategy
    • Metadata
    • Metrics
  • kafkaSink
    • Configuration
    • Publishing to Kafka
    • Metadata
    • Metrics
Cookie Cutter
Docs
IntroductionKafka
More
Blog
Copyright © 2023 Walmart Inc.