Cookie Cutter

Cookie Cutter

  • Introduction
  • API
  • Help

›Introduction

Introduction

  • Getting Started
  • Inputs
  • Message Handling
  • Outputs
  • Versioning and Contribution Guide

Components

  • Dispatch Context
  • State
  • Metrics
  • Tracing
  • Logging
  • Validation
  • Encoding
  • Config
  • Testing

Modules

  • Kafka
  • Azure
  • AMQP
  • gRPC
  • ValidateJS
  • MSSQL
  • Timer
  • StatsD
  • Protobuf
  • Prometheus
  • Redis
  • S3
  • Google PubSub

Inputs

Messages

All inputs into a Cookie Cutter based service are modeled as message streams. Each message must have a type and a payload and may optionally carry additional metadata in the form of key/value pairs and an associated SpanContext for tracing purposes.

export interface IMessage {
    readonly type: string;
    readonly payload: any;
}

export declare class MessageRef {
    readonly payload: IMessage;
    readonly spanContext: SpanContext;
    constructor(meta: IMetadata, payload: IMessage, spanContext?: SpanContext);
    metadata<T>(key: string): T;
    release(value?: any, error?: Error): Promise<void>;
    // ...
}

Input Sources

In order to connect a Cookie Cutter service to an external message source like a Kafka broker it requires an implementation of the IInputSource interface. Cookie Cutter ships with many input source types out of the box, however it can sometimes be useful to implement custom input sources for specialized use cases.

export interface IInputSource {
    start(): AsyncIterableIterator<MessageRef>;
    stop(): Promise<void>;
}

The framework will invoke the start method to create an asynchronous iterator that will transport the incoming messages from the input source into the framework's internals. When the application is requested to shutdown (e.g. on CTRL+C or a signal was sent to the process) it will call the stop method which is expected to complete the iterator previously returned by start.

Finite vs. Infinite Streams

Input sources may create iterators that are either finite or infinite. Infinite streams make a lot of sense when interacting with message broker systems and are the predominate use case for input sources, however finite streams can be very useful for creating service that run batch jobs or on a schedule.

The lifecycle of a Cookie Cutter application is determined by its input source(s). It will try to process all messages as they flow in unless the application is requested to shutdown. This behavior can be used to create batch jobs with custom input sources where the input sources queries all the data that needs to be processed in the current batch and yields each item one at a time. Once all messages are processed the application will terminate. In the case of infinite iterators the application will never terminate and keep processing messages as they flow in.

Multiple Inputs

You may add more than one input source to a Cookie Cutter application. The framework will process the sources in a round-robin fashion as messages are available; if some of the sources only generate message sporadically it will keep processing messages from sources that have messages available to prevent the round-robin logic from stalling the application. Keep in mind though: there are no ordering guarantees for messages originating from different sources.

Example Input Source

export class ExampleInputSource implements IInputSource {
    private readonly done: boolean = false;

    public async *start(): AsyncIterableIterator<MessageRef> {
        for (let i = 0; i < 10000; i++) {
            // ensure that the iterator completes
            // when stop is called
            if (this.done) {
                break;
            }

            const span = globalTracer().startSpan("example");
            const data: IMessage = {
                type: "Example",
                payload: { value: i }
            };
            const msg = new MessageRef(
                {
                    // some additional metadata
                    // associated with this message
                    ["eventTime"]: new Date().toISOString(),
                },
                // the payload of the message that will be
                // passed to the message handler
                data,
                // the root trace for APM
                span.context(), 
            )
            msg.once("released", () => {
                span.finish();
            });

            yield msg;
            await sleep(100);
        }
    }

    public async stop(): Promise<void> {
        this.done = true;
    }
}

Application.create()
    .logger(new ConsoleLogger())
    .input()
        .add(new ExampleInputSource())
        .done()
    .dispatch({
        onExample(msg: any, ctx: IDispatchContext): void {
            ctx.logger.info(`'${msg.value}' generated at ... ${ctx.metadata<string>("eventTime")}`);
        }
    })
    .run();
1 generated at ... 2019-05-01T18:02:28.502Z
2 generated at ... 2019-05-01T18:02:28.602Z
3 generated at ... 2019-05-01T18:02:28.702Z
>>> CTRL+C
← Getting StartedMessage Handling →
  • Messages
  • Input Sources
  • Finite vs. Infinite Streams
  • Multiple Inputs
  • Example Input Source
Cookie Cutter
Docs
IntroductionKafka
More
Blog
Copyright © 2023 Walmart Inc.