AMQP
amqpSource
The amqpSource function creates a new input source that receives messages from RabbitMQ or another message broker following the AMQP 0-9-1 protocol.
The example below starts consuming JSON encoded messages from queue defaultQueueName.
Application.create()
.input()
.add(amqpSource({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
// ...
.run();
Configuration
The available configuration options are
| name | description |
|---|---|
| server.host | host name to connect to |
| server.port | port to connect to (default 5672) |
| queue.name | name of queue to connect to |
| queque.durable | if true (default), queue survives restarts of broker, messages are as persistent as their queue |
| message.expiration | time to live per message in milliseconds (default is no expiration) |
| encoder | defines how the raw data received from AMQP Broker should be converted into message objects |
Consuming from AMQP Broker
Application.create()
.input()
.add(amqpSource({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
.dispatch({
onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
ctx.publish(SomeInputMessage, msg);
},
})
// ...
.run();
Metadata
The following metadata is available in the message handler via ctx.metadata<T>(key)
| name | description |
|---|---|
| AmqpMetadata.name | name of queue this message came from |
| AmqpMetadata.Redelivered | indicates that the message has been previously delivered to this or another client. |
| AmqpMetadata.Expiration | message expiration in milliseconds as specified when publishing the message |
Metrics
| name | description | Type | Tags |
|---|---|---|---|
| cookie_cutter.amqp_consumer.input_msg_received | number of messages received from the broker | increment | host, queueName, event_type, result |
| cookie_cutter.amqp_consumer.input_msg_processed | number of messages consumed successfully/unsuccessfully | increment | host, queueName, event_type, result |
| cookie_cutter.amqp_consumer.unassigned_message_count | number of messages in the queue still not assigned to a consumer | gauge | host, queueName |
| cookie_cutter.amqp_consumer.consumer_count | number of consumers for this queue | gauge | host, queueName |
amqpSink
The amqpSink function creates an output sink that handles published messages. The example below starts publishing JSON encoded messages to queue defaultQueueName.
Application.create()
// ...
.output()
.published(amqpSink({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
// ...
.run();
Configuration
The available configuration options are
| name | description |
|---|---|
| server.host | host name to connect to |
| server.port | port to connect to (default 5672) |
| queue.name | name of queue to connect to |
| queque.durable | if true (default), queue survives restarts of broker, messages are as persistent as their queue |
| message.expiration | time to live per message in milliseconds (default is no expiration) |
| encoder | defines how the raw data received from AMQP Broker should be converted into message objects |
Publishing to AMQP Broker
Application.create()
.dispatch({
onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
ctx.publish(SomeInputMessage, msg);
}
})
.output()
.published(amqpSink({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
// ...
.run();