Cookie Cutter

Cookie Cutter

  • Introduction
  • API
  • Help

›Modules

Introduction

  • Getting Started
  • Inputs
  • Message Handling
  • Outputs
  • Versioning and Contribution Guide

Components

  • Dispatch Context
  • State
  • Metrics
  • Tracing
  • Logging
  • Validation
  • Encoding
  • Config
  • Testing

Modules

  • Kafka
  • Azure
  • AMQP
  • gRPC
  • ValidateJS
  • MSSQL
  • Timer
  • StatsD
  • Protobuf
  • Prometheus
  • Redis
  • S3
  • Google PubSub

Redis

The Redis module allows services to write to a Redis store or retrieve data from there.

Client

The redisClient can be used to load or store data directly within a message handler function.

Application.create()
    // ...
    .services()
        .add("redis", redisClient({ /* configuration */ }))
        .done()
    .dispatch({
        onIMyObject: async (msg: IMyObject, ctx: IDispatchContext): Promise<void> => {
            const redis = ctx.services.get<IRedisClient>("redis");

            // write/overwrite the object associated with a key
            await redis.putObject(ctx.trace.context, IMyObject, msg, "some-key");

            // retrieve the object associated with a key (returns undefined if no such key)
            const msg = await redis.getObject(ctx.trace.context, IMyObject, "some-key");
        },
    })
    .run();

Configuration

NameDescription
hostthe HTTP endpoint to connect to
portthe port to connect to. Default is 6379
dbindex of the database to connect to. Default is 0
passwordthe password to use to connect to Redis. Default is no password
encoderthe encoder to use when converting the payload to a byte array. This defaults to the NullMessageEncoder which only supports Buffers (=byte arrays) being published
typeMapperonly required if correct type information needs to be emitted
base64Encodedetermines if buffers should be stored in base64 encoding. Default is true

Metrics

NameDescriptionTypeTags
cookie_cutter.redis_client.getA call to get a valueincrementtype, db, result
cookie_cutter.redis_client.setA call to set a valueincrementtype, db, result
cookie_cutter.redis_client.xaddA call to add a value to a streamincrementtype, db, result
cookie_cutter.redis_client.xreadA call to read from a streamincrementtype, db, result
cookie_cutter.redis_client.xreadgroupA call to read from a a stream as part of a consumer groupincrementtype, db, result
cookie_cutter.redis_client.xgroupA call to create a consumer groupincrementtype, db, result
cookie_cutter.redis_client.xackA call to acknowledge a message in a streamincrementtype, db, result
cookie_cutter.redis_client.xpendingA call to query pending messages list of a streamincrementtype, db, result
cookie_cutter.redis_client.xclaimA call to claim a pending message of a streamincrementtype, db, result

redisStreamSource

The redisStreamSource function creates a new input source that receives messages from Redis. The example below joins the consumer group consumer-group-1 and receives JSON encoded messages from streams stream1, stream2 and stream3.

    Application.create()
        .input()
        .add(redisStreamSource({
            host: "localhost",
            streams: ["stream1", "stream2", "stream3"],
            consumerGroup: "consumer-group-1",
        }))
        .done()
        // ...
        .run();

Configuration

NameDescription
hostthe HTTP endpoint to connect to
portthe port to connect to. Default is 6379
dbindex of the database to connect to
passwordthe password to use to connect to Redis. Default is no password
typeMapperonly required if correct type information needs to be emitted
encoderthe encoder to use when converting the payload to a byte array.
streamsa list of stream names to consume from
consumerGroupthe name of the consumer group to join
consumerIdthe id of consumer to use, the default value is a generated guid
consumerGroupStartIdthe ID of the last item in the stream to consider already delivered, the default value is $ (the ID of the last item in the stream)
blockTimeoutthe number of milliseconds we want to block before timing out, the default values is 100 ms
idleTimeoutthe minimum number of milliseconds of idle time a pending message should have before we try to claim it, the default value is 30000 ms (30s)
batchSizethe number of messages receive at a time when consuming streams, the default values is 10
reclaimMessageIntervaldefines how often a client is checking for pending messages from dead consumers and tries to reclaim then
base64Encodedetermines if buffers should be stored in base64 encoding. Default is true

Metadata

The following metadata is available in the message handler via ctx.metadata<T>(key)

namedescription
RedisMetadata.Streamthe name of the stream the messages was received from
RedisMetadata.MessageIdthe id of the message that was received
RedisMetadata.ConsumerIdthe id of the consumer group

Metrics

NameDescriptionTypeTags
cookie_cutter.redis_consumer.input_msg_receivednumber of messages received from redis serverincrementstream_name, consumer_group
cookie_cutter.redis_consumer.input_msg_processednumber of messages processedincrementstream_name, consumer_group, result
cookie_cutter.redis_consumer.input_msgs_claimednumber of pending messages claimedincrementstream_name, consumer_group
cookie_cutter.redis_consumer.pending_msg_sizenumber of pending messages foundgaugestream_name, consumer_group
cookie_cutter.redis_consumer.incoming_batch_sizenumber of messages received from redis server in a single fetch requestgauge

redisStreamSink

The 'redisStreamSink' function creates an output sink that handles published messages.

Application.create()
    .output()
        .published(redisStreamSink({
            host: "localhost",
            stream: "streamName",
        }))
        .done()
    // ...
    .run();

Configuration

NameDescription
hostthe HTTP endpoint to connect to
portthe port to connect to. Default is 6379
dbindex of the database to connect to
encoderthe encoder to use when converting the payload to a byte array.
typeMapperonly required if correct type information needs to be emitted
streamthe name of the stream to publish to if no other stream name was specified in the message handler
maxStreamLengthif defined will limit the length of a stream by truncating it when new messages are published. Default is off

Metadata

The following metadata is available in the message handler via ctx.publish

namedescription
RedisStreamMetadata.Streamthe name of the stream to publish to

Metrics

NameDescriptionTypeTags
cookie_cutter.redis_producer.msg_publishedthe number of messages sent to redis serverincrementstream_name, result
← PrometheusS3 →
  • Client
    • Configuration
    • Metrics
  • redisStreamSource
    • Configuration
    • Metadata
    • Metrics
  • redisStreamSink
    • Configuration
    • Metadata
    • Metrics
Cookie Cutter
Docs
IntroductionKafka
More
Blog
Copyright © 2023 Walmart Inc.