Cookie Cutter

Cookie Cutter

  • Introduction
  • API
  • Help

›Modules

Introduction

  • Getting Started
  • Inputs
  • Message Handling
  • Outputs
  • Versioning and Contribution Guide

Components

  • Dispatch Context
  • State
  • Metrics
  • Tracing
  • Logging
  • Validation
  • Encoding
  • Config
  • Testing

Modules

  • Kafka
  • Azure
  • AMQP
  • gRPC
  • ValidateJS
  • MSSQL
  • Timer
  • StatsD
  • Protobuf
  • Prometheus
  • Redis
  • S3
  • Google PubSub

gRPC

Though Cookie Cutter is a framework for message streaming APIs, it has support for writing gRPC services and invoking other gRPC services from sinks or message handlers. Currently the only limitation is that client-side streams are not supported when implementing gRPC servers, however server-side streams are. The gRPC solution is intertwined with the Proto module and currently only supports protobuf encoders generated with protobufjs.

Defining Service Definitions

For the gRPC service to accept requests for our operations and use the encoders generated with protobufjs we need to create a service definition.

package sample;

message SampleRequest {
    int32 id = 1;
}

message SampleResponse {
    string name = 1;
}

service SampleService {
    rpc SampleOperation(SampleRequest) (SampleResponse);
    rpc SampleStream(SampleRequest) stream(SampleRequest);
}

Service definitions follow a simple pattern, for each operation they define the type of the encoder class for request and response. In addition you may want to define an interface for gRPC clients.

interface ISampleService {
    SampleOperation(req: SampleRequest, context: SpanContext): Promise<SampleResponse>;
    SampleStream(req: SampleRequest, context: SpanContext): AsyncIterableIterator<SampleResponse>;
}
import proto from "bundle"; // these are the pbjs generates types

const SampleServiceDefinition = {
    SampleOperation: {
        path: "/sample.SampleService/SampleOperation",
        requestType: proto.SampleRequest,
        requestStream: false,
        responseType: proto.SampleResponse,
        responseStream: false,
    },
    SampleStream: {
        path: "/sample.SampleService/SampleStream",
        requestType: proto.SampleRequest,
        requestStream: false,
        responseType: proto.SampleResponse,
        responseStream: true,
    },
}

gRPC Server

RPC

A simple gRPC server for above's service definition looks like below. Cookie Cutter will treat every incoming gRPC request as a message that flows through the framework's pipeline. The main difference is that the message handler's function must be named after the gRPC operation and not after the request's message type. Also the message handler will have a return value now that represents the gRPC response. A message handler may in addition publish or store additional messages. The gRPC request will be completed by sending the response after all publish and store outputs have been processed - this includes any potential retries due to optimistic concurrency issues.

Application.create()
    .input()
        .add(grpcSource({
            host: "localhost",
            port: 5000,
            definitions: [ SampleServiceDefinition ]
        }))
        .done()
    .dispatch({
        onSampleOperation: (req: ISampleRequest, ctx: IDispatchContext): ISampleResponse => {
            return { name: req.id.toString() };
        },
    });
    .run(ErrorHandlingMode.LogAndContinue, ParallelismMode.Rpc);

Server-Side Streams

The example below accepts gRPC client requests and replies with a stream of 10 messages (once per second) and then ends the stream call.

Application.create()
    .input()
        .add(grpcSource({
            host: "localhost",
            port: 5000,
            definitions: [ SampleServiceDefinition ]
        }))
        .add(intervalSource({
            timeout: 1000,
        }))
        .done()
    .dispatch({
        consumers: [ ],
        onSampleStream: (req: ISampleRequest, ctx: IDispatchContext): void => {
            const stream = ctx.metadata<IResponseStream<ISampleResponse>>(GrpcMetadata.ResponseStream);
            this.consumers.push({ stream, count: 0 });
        },
        onInterval: (msg: IInterval, ctx: IDispatchContext): Promise<void> => {
            for (const item of this.consumers) {
                const { stream } = item;
                if (item.count === 10) {
                    stream.close();
                } else {
                    ctx.logger.info(`sending msg to ${item.peer}`);
                    await item.send({ name: "hello " + item.count });
                }
                item.count++;
            }
        }
    });
    .run(ErrorHandlingMode.LogAndContinue, ParallelismMode.Rpc);

Metadata

The following metadata is available for gRPC requests

NameDescription
GrpcMetadata.Peerthe host and port of the client sending the request
GrpcMetadata.OperationPaththe full name of the operation invoked by the client
GrpcMetadata.ResponseStreamhandle to a IResponseStream<TResponse> object that can be used for server-side streams

Error Handling

Any errors thrown by a message handler will be relayed to the gRPC client. This only works in LogAndContinue mode though, if the service has to run in LogAndRetry mode then it is possible to return an error from a message handler instead of throwing it, which will then propagate to the gRPC client the same way.

Application.create()
    .input()
        .add(grpcSource(...))
        .done()
    .dispatch({
        onSampleOperation: (req: ISampleRequest, ctx: IDispatchContext): ISampleResponse => {
            // the preferred way to error out a call
            throw new Error("bad input");
        },
    });
    .run(ErrorHandlingMode.LogAndContinue, ParallelismMode.Rpc);

Application.create()
    .input()
        .add(grpcSource(...))
        .done()
    .dispatch({
        onSampleOperation: (req: ISampleRequest, ctx: IDispatchContext): ISampleResponse | Error => {
            // use this pattern if service is LogAndRetry
            return new Error("bad input");
        },
    });
    .run(ErrorHandlingMode.LogAndRetry, ParallelismMode.Rpc);

Runtime Options

It is recommended to run gRPC servers in LogAndContinue + RPC mode as incoming requests usually don't require any kinds of ordering guarantees are can be executed independent of each other.

gRPC Client

gRPC clients can be created with the grpcClient helper function and should be registered as services with Cookie Cutter as they require initialization and disposal.

Application.create()
    .services()
        .add("some-service", grpcClient<ISampleService>({
            endpoint: "localhost:5000",
            definition: SampleServiceDefinition,
        }))
        .done()
    .dispatch({
        onSomeInput: (msg: ISomeInput, ctx: IDispatchContext): Promise<void> => {
            const client = ctx.services.get<ISampleService>("some-service");

            // invoke regular RPC call
            const response = await client.SampleOperation({ id: 1 }, ctx.trace.context);

            // consume stream
            const stream = client.SampleStream({ id: 1 1}, ctx.trace.context);
            for async (const item of stream) {
                // do something
            }
        }
    })
← AMQPValidateJS →
  • Defining Service Definitions
  • gRPC Server
    • RPC
    • Server-Side Streams
    • Metadata
    • Error Handling
    • Runtime Options
  • gRPC Client
Cookie Cutter
Docs
IntroductionKafka
More
Blog
Copyright © 2023 Walmart Inc.