Cookie Cutter

Cookie Cutter

  • Introduction
  • API
  • Help

›Modules

Introduction

  • Getting Started
  • Inputs
  • Message Handling
  • Outputs
  • Versioning and Contribution Guide

Components

  • Dispatch Context
  • State
  • Metrics
  • Tracing
  • Logging
  • Validation
  • Encoding
  • Config
  • Testing

Modules

  • Kafka
  • Azure
  • AMQP
  • gRPC
  • ValidateJS
  • MSSQL
  • Timer
  • StatsD
  • Protobuf
  • Prometheus
  • Redis
  • S3
  • Google PubSub

AMQP

amqpSource

The amqpSource function creates a new input source that receives messages from RabbitMQ or another message broker following the AMQP 0-9-1 protocol. The example below starts consuming JSON encoded messages from queue defaultQueueName.

Application.create()
    .input()
        .add(amqpSource({
            server: {
                host: "localhost",
            },
            queue: {
                name: "defaultQueueName",
            },
            encoder: new JsonMessageEncoder(),
        }))
        .done()
    // ...
    .run();

Configuration

The available configuration options are

namedescription
server.hosthost name to connect to
server.portport to connect to (default 5672)
queue.namename of queue to connect to
queque.durableif true (default), queue survives restarts of broker, messages are as persistent as their queue
message.expirationtime to live per message in milliseconds (default is no expiration)
encoderdefines how the raw data received from AMQP Broker should be converted into message objects

Consuming from AMQP Broker

Application.create()
    .input()
        .add(amqpSource({
            server: {
                host: "localhost",
            },
            queue: {
                name: "defaultQueueName",
            },
            encoder: new JsonMessageEncoder(),
        }))
        .done()
        .dispatch({
            onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
                ctx.publish(SomeInputMessage, msg);
            },
        })
    // ...
    .run();

Metadata

The following metadata is available in the message handler via ctx.metadata<T>(key)

namedescription
AmqpMetadata.namename of queue this message came from
AmqpMetadata.Redeliveredindicates that the message has been previously delivered to this or another client.
AmqpMetadata.Expirationmessage expiration in milliseconds as specified when publishing the message

Metrics

namedescriptionTypeTags
cookie_cutter.amqp_consumer.input_msg_receivednumber of messages received from the brokerincrementhost, queueName, event_type, result
cookie_cutter.amqp_consumer.input_msg_processednumber of messages consumed successfully/unsuccessfullyincrementhost, queueName, event_type, result
cookie_cutter.amqp_consumer.unassigned_message_countnumber of messages in the queue still not assigned to a consumergaugehost, queueName
cookie_cutter.amqp_consumer.consumer_countnumber of consumers for this queuegaugehost, queueName

amqpSink

The amqpSink function creates an output sink that handles published messages. The example below starts publishing JSON encoded messages to queue defaultQueueName.

Application.create()
    // ...
    .output()
        .published(amqpSink({
            server: {
                host: "localhost",
            },
            queue: {
                name: "defaultQueueName",
            },
            encoder: new JsonMessageEncoder(),
        }))
        .done()
    // ...
    .run();

Configuration

The available configuration options are

namedescription
server.hosthost name to connect to
server.portport to connect to (default 5672)
queue.namename of queue to connect to
queque.durableif true (default), queue survives restarts of broker, messages are as persistent as their queue
message.expirationtime to live per message in milliseconds (default is no expiration)
encoderdefines how the raw data received from AMQP Broker should be converted into message objects

Publishing to AMQP Broker

Application.create()
    .dispatch({
        onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
            ctx.publish(SomeInputMessage, msg);
        }
    })
    .output()
        .published(amqpSink({
            server: {
                host: "localhost",
            },
            queue: {
                name: "defaultQueueName",
            },
            encoder: new JsonMessageEncoder(),
        }))
        .done()
    // ...
    .run();
← AzuregRPC →
  • amqpSource
    • Configuration
    • Consuming from AMQP Broker
    • Metadata
    • Metrics
  • amqpSink
    • Configuration
    • Publishing to AMQP Broker
Cookie Cutter
Docs
IntroductionKafka
More
Blog
Copyright © 2023 Walmart Inc.