Package detail

rhea-promise

amqp1.4mApache-2.03.0.3

A Promisified layer over rhea AMQP client

amqp, rhea, promise, amqp-10

readme

rhea-promise

A Promisified layer over rhea AMQP client.

Pre-requisite

  • Node.js version: 6.x or higher.
  • We would still encourage you to install the latest available LTS version at any given time from https://nodejs.org. It is a good practice to always install the latest available LTS version of node.js.
  • Installing node.js on Windows or macOS is very simple with available installers on the node.js website. If you are using a linux based OS, then you can find easy to follow, one step installation instructions over here.

Installation

npm install rhea-promise

Debug logs

You can set the following environment variable to get the debug logs.

  • Getting debug logs from this library
    export DEBUG=rhea-promise*
    
  • Getting debug logs from this and the rhea library
    export DEBUG=rhea*
    
  • If you are not interested in viewing the message transformation (which consumes lot of console/disk space) then you can set the DEBUG environment variable as follows:
    export DEBUG=rhea*,-rhea:raw,-rhea:message,-rhea-promise:eventhandler,-rhea-promise:translate
    

Logging to a file

  • Set the DEBUG environment variable as shown above and then run your test script as follows:
    • Logging statements from you test script go to out.log and logging statement from the sdk go to debug.log.
      node your-test-script.js > out.log 2>debug.log
      
    • Logging statements from your test script and the sdk go to the same file out.log by redirecting stderr to stdout (&1), and then redirect stdout to a file:
      node your-test-script.js >out.log 2>&1
      
    • Logging statements from your test script and the sdk go to the same file out.log.
        node your-test-script.js &> out.log
      

Notable differences between rhea and rhea-promise

Error propagation to the parent entity

  • In AMQP, for two peers to communicate successfully, different entities (Container, Connection, Session, Link) need to be created. There is a relationship between those entities.
    • 1 Container can have 1..* Connections.
    • 1 Connection can have 1..* Sessions.
    • 1 Session can have 1..* Links.
    • A Link can have the role of Receiver or Sender.
  • Each entity (connection, session, link) maintains its own state to let other entities know about what it is doing. Thus,
    • if the connection goes down then, everything on the connection - sessions, links are down.
    • if a session goes down then, all the the links on that session are down.
  • When an entity goes down rhea emits *_error and *_close events, where * can be "sender", "receiver", "session", "connection". If event listeners for the aforementioned events are not added at the appropriate level, then rhea propagates those events to its parent entity. If they are not handled at the Container level (uber parent), then they are transformed into an error event. This would cause your application to crash if there is no listener added for the error event.
  • In rhea-promise, the library creates, equivalent objects Connection, Session, Sender, Receiver and wraps objects from rhea within them. It adds event listeners to all the possible events that can occur at any level and re-emits those events with the same arguments as one would expect from rhea. This makes it easy for consumers of rhea-promise to use the EventEmitter pattern. Users can efficiently use different event emitter methods like .once(), .on(), .prependListeners(), etc. Since rhea-promise add those event listeners on rhea objects, the errors will never be propagated to the parent entity. This can be good as well as bad depending on what you do.
    • Good - *_error events and *_close events emitted on an entity will not be propagated to it's parent. Thus ensuring that errors are handled at the right level.
    • Bad - If you do not add listeners for *_error and *_close events at the right level, then you will never know why an entity shutdown.

We believe our design enforces good practices to be followed while using the event emitter pattern.

Examples

Please take a look at the sample.env file for examples on how to provide the values for different parameters like host, username, password, port, senderAddress, receiverAddress, etc.

Sending a message via Sender.

  • Running the example from terminal: > ts-node ./examples/send.ts.

NOTE: If you are running the sample with .env config file, then please run the sample from the directory that contains .env config file.

import {
  Connection, Sender, EventContext, Message, ConnectionOptions, Delivery, SenderOptions
} from "rhea-promise";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();

const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const senderAddress = process.env.SENDER_ADDRESS || "address";

async function main(): Promise<void> {
  const connectionOptions: ConnectionOptions = {
    transport: "tls",
    host: host,
    hostname: host,
    username: username,
    password: password,
    port: port,
    reconnect: false
  };
  const connection: Connection = new Connection(connectionOptions);
  const senderName = "sender-1";
  const senderOptions: SenderOptions = {
    name: senderName,
    target: {
      address: senderAddress
    },
    onError: (context: EventContext) => {
      const senderError = context.sender && context.sender.error;
      if (senderError) {
        console.log(">>>>> [%s] An error occurred for sender '%s': %O.",
          connection.id, senderName, senderError);
      }
    },
    onSessionError: (context: EventContext) => {
      const sessionError = context.session && context.session.error;
      if (sessionError) {
        console.log(">>>>> [%s] An error occurred for session of sender '%s': %O.",
          connection.id, senderName, sessionError);
      }
    }
  };

  await connection.open();
  const sender: Sender = await connection.createSender(senderOptions);
  const message: Message = {
    body: "Hello World!!",
    message_id: "12343434343434"
  };

  // Please, note that we are not awaiting on sender.send()
  // You will notice that `delivery.settled` will be `false`.
  const delivery: Delivery = sender.send(message);
  console.log(">>>>>[%s] Delivery id: %d, settled: %s",
    connection.id,
    delivery.id,
    delivery.settled);

  await sender.close();
  await connection.close();
}

main().catch((err) => console.log(err));

Sending a message via AwaitableSender

  • Running the example from terminal: > ts-node ./examples/awaitableSend.ts.
import {
  Connection, Message, ConnectionOptions, Delivery, AwaitableSenderOptions, AwaitableSender
} from "rhea-promise";

import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();

const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const senderAddress = process.env.SENDER_ADDRESS || "address";

async function main(): Promise<void> {
  const connectionOptions: ConnectionOptions = {
    transport: "tls",
    host: host,
    hostname: host,
    username: username,
    password: password,
    port: port,
    reconnect: false
  };
  const connection: Connection = new Connection(connectionOptions);
  const senderName = "sender-1";
  const awaitableSenderOptions: AwaitableSenderOptions = {
    name: senderName,
    target: {
      address: senderAddress
    },
  };

  await connection.open();
  // Notice that we are awaiting on the message being sent.
  const sender: AwaitableSender = await connection.createAwaitableSender(
    awaitableSenderOptions
  );

  for (let i = 0; i < 10; i++) {
    const message: Message = {
      body: `Hello World - ${i}`,
      message_id: i
    };
    // Note: Here we are awaiting for the send to complete.
    // You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects.
    const delivery: Delivery = await sender.send(message, {
      timeoutInSeconds: 10
    });
    console.log(
      "[%s] await sendMessage -> Delivery id: %d, settled: %s",
      connection.id,
      delivery.id,
      delivery.settled
    );
  }

  await sender.close();
  await connection.close();
}

main().catch((err) => console.log(err));

Receiving a message

  • Running the example from terminal: > ts-node ./examples/receive.ts.

NOTE: If you are running the sample with .env config file, then please run the sample from the directory that contains .env config file.

import {
  Connection, Receiver, EventContext, ConnectionOptions, ReceiverOptions, delay, ReceiverEvents
} from "rhea-promise";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();

const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const receiverAddress = process.env.RECEIVER_ADDRESS || "address";

async function main(): Promise<void> {
  const connectionOptions: ConnectionOptions = {
    transport: "tls",
    host: host,
    hostname: host,
    username: username,
    password: password,
    port: port,
    reconnect: false
  };
  const connection: Connection = new Connection(connectionOptions);
  const receiverName = "receiver-1";
  const receiverOptions: ReceiverOptions = {
    name: receiverName,
    source: {
      address: receiverAddress
    },
    onSessionError: (context: EventContext) => {
      const sessionError = context.session && context.session.error;
      if (sessionError) {
        console.log(">>>>> [%s] An error occurred for session of receiver '%s': %O.",
          connection.id, receiverName, sessionError);
      }
    }
  };

  await connection.open();
  const receiver: Receiver = await connection.createReceiver(receiverOptions);
  receiver.on(ReceiverEvents.message, (context: EventContext) => {
    console.log("Received message: %O", context.message);
  });
  receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
    const receiverError = context.receiver && context.receiver.error;
    if (receiverError) {
      console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",
        connection.id, receiverName, receiverError);
    }
  });
  // sleeping for 2 mins to let the receiver receive messages and then closing it.
  await delay(120000);
  await receiver.close();
  await connection.close();
}

main().catch((err) => console.log(err));

Building the library

  • Clone the repo
    git clone https://github.com/amqp/rhea-promise.git
    
  • Install typescript, ts-node globally
    npm i -g typescript
    npm i -g ts-node
    
  • NPM install from the root of the package
    npm i
    
  • Build the project
    npm run build
    

AMQP Protocol specification

Amqp protocol specification can be found here.

changelog

3.0.3 - (2024-06-12)

  • Release the resources if Session.createReceiver() rejects due to timeout.

3.0.2 - (2024-05-02)

  • Set the max listener limit to 1000 for RheaConnection

3.0.1 - (2023-05-05)

  • Fix a bug where Connection constructor isn't setting operationTimeoutInSeconds correctly.

3.0.0 - (2023-03-02)

  • Update rhea dependency to the 3.x major version.
  • Update dev dependency typescript to ~4.3.0

Breaking changes

  • rhea has one breaking change introduced in version 3.x that impact this library: The TLS options type to Container.listen() now requires a transport property which is either "ssl" or "tls".

2.1.0 - (2021-06-30)

  • Exposes a new receiver.drainCredit() method that calls through to rhea's receiver.drain_credit() method.
  • Update rhea minimum version to 2.0.3

2.0.0 - (2021-06-03)

  • Updates rhea dependency to the 2.x major version, and the tslib dependency to the 2.x major version.
  • Adds CreateRequestResponseLinkOptions as an exported interface.

Breaking changes

  • rhea has 1 breaking change introduced in version 2.x: timestamps are not deserialized as Date objects instead of numbers.
  • Updates AwaitableSendOptions to include the optional fields tag and format which were previously passed to AwaitableSender.send(). These fields are no longer positional arguments on AwaitableSender.send().
  • Adds SenderSendOptions to include the optional fields tag and format which were previously passed to Sender.send(). These fields are no longer positional arguments on Sender.send().
  • Removes sendTimeoutInSeconds from the AwaitableSendOptions that is passed to the AwaitableSender constructor. timeoutInSeconds on AwaitableSenderOptions can still be used to set the timeout for individual AwaitableSender.send() invocations.
  • Renames the following TypeScript interfaces to better match the methods they apply to:
    • SenderOptionsWithSession -> CreateSenderOptions
    • AwaitableSenderOptionsWithSession -> CreateAwaitableSenderOptions
    • ReceiverOptionsWithSession -> CreateReceiverOptions

1.2.1 - (2021-04-15)

  • createSession, createReceiver, and createSender methods now only close underlying rhea analogue when cancelled if the resource has already been opened.

1.2.0 - 2021-03-25

  • Exposes the incoming getter on the Session that lets accessing size and capacity of the incoming deliveries #79.
  • Updates the error message for the AbortError to be a standard message The operation was aborted..

1.1.0 - 2021-02-08

  • All async methods now take a signal that can be used to cancel the operation. Fixes #48
  • Added a timeoutInSeconds parameter to the send method on the AwaitableSender that overrides the timeout value for the send operation set when creating the sender.
  • When the error event is fired when closing the sender/receiver link, surface errors occurring on the sender/receiver context if none are found on the session context. Details can be found in PR #55
  • Updated minimum version of rhea to ^1.0.24. Details can be found in PR 68

1.0.0 - 2019-06-27

  • Updated minimum version of rhea to ^1.0.8.
  • Added a read only property id to the Session object. The id property is created by concatenating session's local channel, remote channel and the connection id "local-<number>_remote-<number>_<connection-id>", thus making it unique for that connection.
  • Improved log statements by adding the session id and the sender, receiver name to help while debugging applications.
  • Added options to Link.close({closeSession: true | false}), thus the user can specify whether the underlying session should be closed while closing the Sender|Receiver. Default is true.
  • Improved open and close operations on Connection, Session and Link by creating timer in case the connection gets disconnected. Fixes #41.
  • The current Sender does not have a provision of "awaiting" on sending a message. The user needs to add handlers on the Sender for accepted, rejected, released, modified to ensure whether the message was successfully sent. Now, we have added a new AwaitableSender which adds the handlers internally and provides an awaitable send() operation to the customer. Fixes #45.
  • Exporting new Errors:
    • InsufficientCreditError: Defines the error that occurs when the Sender does not have enough credit.
    • SendOperationFailedError: Defines the error that occurs when the Sender fails to send a message.

0.2.0 - 2019-05-17

  • Updated OperationTimeoutError to be a non-AMQP Error as pointed out in #42. Fixed in PR.

0.1.15 - 2019-04-10

  • Export rhea types for Typed. PR.
  • Export rhea types for WebSocketImpl and WebSocketInstance. PR.
  • When opening a connection fails with no error, use standard error message. PR.

0.1.14 - 2019-03-19

  • Allow websockets usage on a connection without creating a container first. PR.
  • New function removeAllSessions() on the connection to clear the internal map in rhea to ensure sessions are not reconnected on the next connection.open() call. PR.
  • Remove all event listeners on link and session objects when close() is called on them. PR

0.1.13 - 2018-12-11

  • Throw OperationTimeoutError when a Promise to create/close an entity is rejected.

0.1.12 - 2018-11-16

  • Fix a minor bug in receiver creation.

0.1.11 - 2018-11-15

  • Added checks for some event handler methods to exist before logging information that uses node's event handlers inbuilt functions.
  • Improved error checking while creating the receiver.

0.1.10 - 2018-11-01

  • Provided an option to add an event handler for "settled" event on the Receiver.

0.1.9 - 2018-10-24

  • With the usage of importHelpers, the tslib will be needed in package.json for installers using older versions of npm (or using yarn). PR.

0.1.8 - 2018-10-22

  • Allow setting drain property on the receiver PR.

0.1.7 - 2018-10-19

  • Fixed a bug while populating the connectionId PR.

0.1.6 - 2018-09-28

  • property actionInitiated is now of type number which is incremented when the create, close action on an entity is under process and decremented when the action completes (succeeeded or failed).

0.1.5 - 2018-09-27

  • Improved log statements for better debugging.
  • Any type of error event will be emitted with a tick delay. This would give enough time for the create() methods to resolve the promise.
  • Added a new boolean property actionInitiated which indicates whether the create, close action on an entity is under process.

0.1.4 - 2018-09-25

  • options is a required property of Connection and Container.

0.1.3 - 2018-09-25

  • Transform relevant objects in rhea EventContext to rhea-promise objects.
  • Ensure that container.createConnection() creates a connection on that container and not on the default container.

0.1.2 - 2018-09-20

  • TS target to ES2015. This should help us support node.js version 6.x and above.

0.1.1 - 2018-09-20

  • Update homepage, repository and bug urls in package.json

0.1.0 - 2018-09-20

  • Initial version of rhea-promise.